diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 511207d553..29dc613917 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -44,8 +44,9 @@ func main() { partsCount := tm.SmallPartsCount + tm.BigPartsCount blocksCount := tm.SmallBlocksCount + tm.BigBlocksCount rowsCount := tm.SmallRowsCount + tm.BigRowsCount - logger.Infof("successfully opened storage %q in %s; partsCount: %d; blocksCount: %d; rowsCount: %d", - *storageDataPath, time.Since(startTime), partsCount, blocksCount, rowsCount) + sizeBytes := tm.SmallSizeBytes + tm.BigSizeBytes + logger.Infof("successfully opened storage %q in %s; partsCount: %d; blocksCount: %d; rowsCount: %d; sizeBytes: %d", + *storageDataPath, time.Since(startTime), partsCount, blocksCount, rowsCount, sizeBytes) registerStorageMetrics(strg) @@ -290,6 +291,16 @@ func registerStorageMetrics(strg *storage.Storage) { return float64(idbm().BlocksCount) }) + metrics.NewGauge(`vm_data_size_bytes{type="storage/big"}`, func() float64 { + return float64(tm().BigSizeBytes) + }) + metrics.NewGauge(`vm_data_size_bytes{type="storage/small"}`, func() float64 { + return float64(tm().SmallSizeBytes) + }) + metrics.NewGauge(`vm_data_size_bytes{type="indexdb"}`, func() float64 { + return float64(idbm().SizeBytes) + }) + metrics.NewGauge(`vm_rows{type="storage/big"}`, func() float64 { return float64(tm().BigRowsCount) }) diff --git a/lib/fs/fs.go b/lib/fs/fs.go index 8c8501ba88..0a124a361d 100644 --- a/lib/fs/fs.go +++ b/lib/fs/fs.go @@ -185,6 +185,18 @@ func MustClose(f *os.File) { } } +// MustFileSize returns file size for the given path. +func MustFileSize(path string) uint64 { + fi, err := os.Stat(path) + if err != nil { + logger.Panicf("FATAL: cannot stat %q: %s", path, err) + } + if fi.IsDir() { + logger.Panicf("FATAL: %q must be a file, not a directory", path) + } + return uint64(fi.Size()) +} + // IsPathExist returns whether the given path exists. func IsPathExist(path string) bool { if _, err := os.Stat(path); err != nil { diff --git a/lib/mergeset/inmemory_part.go b/lib/mergeset/inmemory_part.go index 897efa1c69..8297ec5f14 100644 --- a/lib/mergeset/inmemory_part.go +++ b/lib/mergeset/inmemory_part.go @@ -84,13 +84,18 @@ func (ip *inmemoryPart) Init(ib *inmemoryBlock) { // It is unsafe re-using ip while the returned part is in use. func (ip *inmemoryPart) NewPart() *part { ph := ip.ph - p, err := newPart(&ph, "", ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData) + size := ip.size() + p, err := newPart(&ph, "", size, ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData) if err != nil { logger.Panicf("BUG: cannot create a part from inmemoryPart: %s", err) } return p } +func (ip *inmemoryPart) size() uint64 { + return uint64(len(ip.metaindexData.B) + len(ip.indexData.B) + len(ip.itemsData.B) + len(ip.lensData.B)) +} + func getInmemoryPart() *inmemoryPart { v := ipPool.Get() if v == nil { diff --git a/lib/mergeset/part.go b/lib/mergeset/part.go index da0ba01c23..2257c6f4e2 100644 --- a/lib/mergeset/part.go +++ b/lib/mergeset/part.go @@ -48,6 +48,8 @@ type part struct { path string + size uint64 + mrs []metaindexRow indexFile fs.ReadAtCloser @@ -71,6 +73,7 @@ func openFilePart(path string) (*part, error) { if err != nil { return nil, fmt.Errorf("cannot open %q: %s", metaindexPath, err) } + metaindexSize := fs.MustFileSize(metaindexPath) indexPath := path + "/index.bin" indexFile, err := fs.OpenReaderAt(indexPath) @@ -78,6 +81,7 @@ func openFilePart(path string) (*part, error) { metaindexFile.MustClose() return nil, fmt.Errorf("cannot open %q: %s", indexPath, err) } + indexSize := fs.MustFileSize(indexPath) itemsPath := path + "/items.bin" itemsFile, err := fs.OpenReaderAt(itemsPath) @@ -86,6 +90,7 @@ func openFilePart(path string) (*part, error) { indexFile.MustClose() return nil, fmt.Errorf("cannot open %q: %s", itemsPath, err) } + itemsSize := fs.MustFileSize(itemsPath) lensPath := path + "/lens.bin" lensFile, err := fs.OpenReaderAt(lensPath) @@ -95,11 +100,13 @@ func openFilePart(path string) (*part, error) { itemsFile.MustClose() return nil, fmt.Errorf("cannot open %q: %s", lensPath, err) } + lensSize := fs.MustFileSize(lensPath) - return newPart(&ph, path, metaindexFile, indexFile, itemsFile, lensFile) + size := metaindexSize + indexSize + itemsSize + lensSize + return newPart(&ph, path, size, metaindexFile, indexFile, itemsFile, lensFile) } -func newPart(ph *partHeader, path string, metaindexReader filestream.ReadCloser, indexFile, itemsFile, lensFile fs.ReadAtCloser) (*part, error) { +func newPart(ph *partHeader, path string, size uint64, metaindexReader filestream.ReadCloser, indexFile, itemsFile, lensFile fs.ReadAtCloser) (*part, error) { var errors []error mrs, err := unmarshalMetaindexRows(nil, metaindexReader) if err != nil { @@ -109,6 +116,7 @@ func newPart(ph *partHeader, path string, metaindexReader filestream.ReadCloser, p := &part{ path: path, + size: size, mrs: mrs, indexFile: indexFile, diff --git a/lib/mergeset/part_search_test.go b/lib/mergeset/part_search_test.go index de6ac0576a..3e35f54a44 100644 --- a/lib/mergeset/part_search_test.go +++ b/lib/mergeset/part_search_test.go @@ -156,7 +156,8 @@ func newTestPart(blocksCount, maxItemsPerBlock int) (*part, []string, error) { if itemsMerged != uint64(len(items)) { return nil, nil, fmt.Errorf("unexpected itemsMerged; got %d; want %d", itemsMerged, len(items)) } - p, err := newPart(&ip.ph, "partName", ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData) + size := ip.size() + p, err := newPart(&ip.ph, "partName", size, ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData) if err != nil { return nil, nil, fmt.Errorf("cannot create part: %s", err) } diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index e73e08c16a..480bf9d590 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -161,8 +161,8 @@ func OpenTable(path string) (*Table, error) { var m TableMetrics tb.UpdateMetrics(&m) - logger.Infof("table %q has been opened in %s; partsCount: %d; blocksCount: %d, itemsCount: %d", - path, time.Since(startTime), m.PartsCount, m.BlocksCount, m.ItemsCount) + logger.Infof("table %q has been opened in %s; partsCount: %d; blocksCount: %d, itemsCount: %d; sizeBytes: %d", + path, time.Since(startTime), m.PartsCount, m.BlocksCount, m.ItemsCount, m.SizeBytes) return tb, nil } @@ -242,6 +242,7 @@ type TableMetrics struct { BlocksCount uint64 ItemsCount uint64 + SizeBytes uint64 DataBlocksCacheSize uint64 DataBlocksCacheRequests uint64 @@ -274,6 +275,7 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) { m.BlocksCount += p.ph.blocksCount m.ItemsCount += p.ph.itemsCount + m.SizeBytes += p.size m.DataBlocksCacheSize += p.ibCache.Len() m.DataBlocksCacheRequests += p.ibCache.Requests() @@ -727,6 +729,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP if err != nil { return fmt.Errorf("cannot open merged part %q: %s", dstPartPath, err) } + newPSize := newP.size newPW := &partWrapper{ p: newP, refCount: 1, @@ -761,7 +764,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP d := time.Since(startTime) if d > 10*time.Second { - logger.Infof("merged %d items in %s at %d items/sec to %q", outItemsCount, d, int(float64(outItemsCount)/d.Seconds()), dstPartPath) + logger.Infof("merged %d items in %s at %d items/sec to %q; bytesSize: %d", outItemsCount, d, int(float64(outItemsCount)/d.Seconds()), dstPartPath, newPSize) } return nil diff --git a/lib/storage/inmemory_part.go b/lib/storage/inmemory_part.go index 2fb27f56c4..da8d78e56c 100644 --- a/lib/storage/inmemory_part.go +++ b/lib/storage/inmemory_part.go @@ -51,7 +51,8 @@ func (mp *inmemoryPart) InitFromRows(rows []rawRow) { // It is unsafe re-using mp while the returned part is in use. func (mp *inmemoryPart) NewPart() (*part, error) { ph := mp.ph - return newPart(&ph, "", mp.metaindexData.NewReader(), &mp.timestampsData, &mp.valuesData, &mp.indexData) + size := uint64(len(mp.timestampsData.B) + len(mp.valuesData.B) + len(mp.indexData.B) + len(mp.metaindexData.B)) + return newPart(&ph, "", size, mp.metaindexData.NewReader(), &mp.timestampsData, &mp.valuesData, &mp.indexData) } func getInmemoryPart() *inmemoryPart { diff --git a/lib/storage/part.go b/lib/storage/part.go index 6378d3f0f1..9cf7ec7585 100644 --- a/lib/storage/part.go +++ b/lib/storage/part.go @@ -36,6 +36,9 @@ type part struct { // Empty for in-memory part. path string + // Total size in bytes of part data. + size uint64 + timestampsFile fs.ReadAtCloser valuesFile fs.ReadAtCloser indexFile fs.ReadAtCloser @@ -59,6 +62,7 @@ func openFilePart(path string) (*part, error) { if err != nil { return nil, fmt.Errorf("cannot open timestamps file: %s", err) } + timestampsSize := fs.MustFileSize(timestampsPath) valuesPath := path + "/values.bin" valuesFile, err := fs.OpenReaderAt(valuesPath) @@ -66,6 +70,7 @@ func openFilePart(path string) (*part, error) { timestampsFile.MustClose() return nil, fmt.Errorf("cannot open values file: %s", err) } + valuesSize := fs.MustFileSize(valuesPath) indexPath := path + "/index.bin" indexFile, err := fs.OpenReaderAt(indexPath) @@ -74,6 +79,7 @@ func openFilePart(path string) (*part, error) { valuesFile.MustClose() return nil, fmt.Errorf("cannot open index file: %s", err) } + indexSize := fs.MustFileSize(indexPath) metaindexPath := path + "/metaindex.bin" metaindexFile, err := filestream.Open(metaindexPath, true) @@ -83,15 +89,17 @@ func openFilePart(path string) (*part, error) { indexFile.MustClose() return nil, fmt.Errorf("cannot open metaindex file: %s", err) } + metaindexSize := fs.MustFileSize(metaindexPath) - return newPart(&ph, path, metaindexFile, timestampsFile, valuesFile, indexFile) + size := timestampsSize + valuesSize + indexSize + metaindexSize + return newPart(&ph, path, size, metaindexFile, timestampsFile, valuesFile, indexFile) } // newPart returns new part initialized with the given arguments. // // The returned part calls MustClose on all the files passed to newPart // when calling part.MustClose. -func newPart(ph *partHeader, path string, metaindexReader filestream.ReadCloser, timestampsFile, valuesFile, indexFile fs.ReadAtCloser) (*part, error) { +func newPart(ph *partHeader, path string, size uint64, metaindexReader filestream.ReadCloser, timestampsFile, valuesFile, indexFile fs.ReadAtCloser) (*part, error) { var errors []error metaindex, err := unmarshalMetaindexRows(nil, metaindexReader) if err != nil { @@ -102,6 +110,7 @@ func newPart(ph *partHeader, path string, metaindexReader filestream.ReadCloser, p := &part{ ph: *ph, path: path, + size: size, timestampsFile: timestampsFile, valuesFile: valuesFile, indexFile: indexFile, diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 0803f16a26..b0327fef9e 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -282,6 +282,9 @@ type partitionMetrics struct { SmallIndexBlocksCacheRequests uint64 SmallIndexBlocksCacheMisses uint64 + BigSizeBytes uint64 + SmallSizeBytes uint64 + BigRowsCount uint64 SmallRowsCount uint64 @@ -326,6 +329,7 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) { m.BigIndexBlocksCacheMisses += p.ibCache.Misses() m.BigRowsCount += p.ph.RowsCount m.BigBlocksCount += p.ph.BlocksCount + m.BigSizeBytes += p.size m.BigPartsRefCount += atomic.LoadUint64(&pw.refCount) } @@ -337,6 +341,7 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) { m.SmallIndexBlocksCacheMisses += p.ibCache.Misses() m.SmallRowsCount += p.ph.RowsCount m.SmallBlocksCount += p.ph.BlocksCount + m.SmallSizeBytes += p.size m.SmallPartsRefCount += atomic.LoadUint64(&pw.refCount) } @@ -1013,12 +1018,14 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro } var newPW *partWrapper + var newPSize uint64 if len(dstPartPath) > 0 { // Open the merged part if it is non-empty. newP, err := openFilePart(dstPartPath) if err != nil { return fmt.Errorf("cannot open merged part %q: %s", dstPartPath, err) } + newPSize = newP.size newPW = &partWrapper{ p: newP, refCount: 1, @@ -1057,7 +1064,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro d := time.Since(startTime) if d > 10*time.Second { - logger.Infof("merged %d rows in %s at %d rows/sec to %q", outRowsCount, d, int(float64(outRowsCount)/d.Seconds()), dstPartPath) + logger.Infof("merged %d rows in %s at %d rows/sec to %q; sizeBytes: %d", outRowsCount, d, int(float64(outRowsCount)/d.Seconds()), dstPartPath, newPSize) } return nil