all: add vm_data_size_bytes metrics for easy monitoring of on-disk data size and on-disk inverted index size

This commit is contained in:
Aliaksandr Valialkin 2019-07-04 19:09:40 +03:00
parent 8d83dcf332
commit 56c154f45b
9 changed files with 70 additions and 13 deletions

View file

@ -45,8 +45,9 @@ func Init() {
partsCount := tm.SmallPartsCount + tm.BigPartsCount
blocksCount := tm.SmallBlocksCount + tm.BigBlocksCount
rowsCount := tm.SmallRowsCount + tm.BigRowsCount
logger.Infof("successfully opened storage %q in %s; partsCount: %d; blocksCount: %d; rowsCount: %d",
*DataPath, time.Since(startTime), partsCount, blocksCount, rowsCount)
sizeBytes := tm.SmallSizeBytes + tm.BigSizeBytes
logger.Infof("successfully opened storage %q in %s; partsCount: %d; blocksCount: %d; rowsCount: %d; sizeBytes: %d",
*DataPath, time.Since(startTime), partsCount, blocksCount, rowsCount, sizeBytes)
registerStorageMetrics(Storage)
}
@ -340,6 +341,16 @@ func registerStorageMetrics(strg *storage.Storage) {
return float64(idbm().BlocksCount)
})
metrics.NewGauge(`vm_data_size_bytes{type="storage/big"}`, func() float64 {
return float64(tm().BigSizeBytes)
})
metrics.NewGauge(`vm_data_size_bytes{type="storage/small"}`, func() float64 {
return float64(tm().SmallSizeBytes)
})
metrics.NewGauge(`vm_data_size_bytes{type="indexdb"}`, func() float64 {
return float64(idbm().SizeBytes)
})
metrics.NewGauge(`vm_rows{type="storage/big"}`, func() float64 {
return float64(tm().BigRowsCount)
})

View file

@ -185,6 +185,18 @@ func MustClose(f *os.File) {
}
}
// MustFileSize returns file size for the given path.
func MustFileSize(path string) uint64 {
fi, err := os.Stat(path)
if err != nil {
logger.Panicf("FATAL: cannot stat %q: %s", path, err)
}
if fi.IsDir() {
logger.Panicf("FATAL: %q must be a file, not a directory", path)
}
return uint64(fi.Size())
}
// IsPathExist returns whether the given path exists.
func IsPathExist(path string) bool {
if _, err := os.Stat(path); err != nil {

View file

@ -84,13 +84,18 @@ func (ip *inmemoryPart) Init(ib *inmemoryBlock) {
// It is unsafe re-using ip while the returned part is in use.
func (ip *inmemoryPart) NewPart() *part {
ph := ip.ph
p, err := newPart(&ph, "", ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData)
size := ip.size()
p, err := newPart(&ph, "", size, ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData)
if err != nil {
logger.Panicf("BUG: cannot create a part from inmemoryPart: %s", err)
}
return p
}
func (ip *inmemoryPart) size() uint64 {
return uint64(len(ip.metaindexData.B) + len(ip.indexData.B) + len(ip.itemsData.B) + len(ip.lensData.B))
}
func getInmemoryPart() *inmemoryPart {
v := ipPool.Get()
if v == nil {

View file

@ -48,6 +48,8 @@ type part struct {
path string
size uint64
mrs []metaindexRow
indexFile fs.ReadAtCloser
@ -71,6 +73,7 @@ func openFilePart(path string) (*part, error) {
if err != nil {
return nil, fmt.Errorf("cannot open %q: %s", metaindexPath, err)
}
metaindexSize := fs.MustFileSize(metaindexPath)
indexPath := path + "/index.bin"
indexFile, err := fs.OpenReaderAt(indexPath)
@ -78,6 +81,7 @@ func openFilePart(path string) (*part, error) {
metaindexFile.MustClose()
return nil, fmt.Errorf("cannot open %q: %s", indexPath, err)
}
indexSize := fs.MustFileSize(indexPath)
itemsPath := path + "/items.bin"
itemsFile, err := fs.OpenReaderAt(itemsPath)
@ -86,6 +90,7 @@ func openFilePart(path string) (*part, error) {
indexFile.MustClose()
return nil, fmt.Errorf("cannot open %q: %s", itemsPath, err)
}
itemsSize := fs.MustFileSize(itemsPath)
lensPath := path + "/lens.bin"
lensFile, err := fs.OpenReaderAt(lensPath)
@ -95,11 +100,13 @@ func openFilePart(path string) (*part, error) {
itemsFile.MustClose()
return nil, fmt.Errorf("cannot open %q: %s", lensPath, err)
}
lensSize := fs.MustFileSize(lensPath)
return newPart(&ph, path, metaindexFile, indexFile, itemsFile, lensFile)
size := metaindexSize + indexSize + itemsSize + lensSize
return newPart(&ph, path, size, metaindexFile, indexFile, itemsFile, lensFile)
}
func newPart(ph *partHeader, path string, metaindexReader filestream.ReadCloser, indexFile, itemsFile, lensFile fs.ReadAtCloser) (*part, error) {
func newPart(ph *partHeader, path string, size uint64, metaindexReader filestream.ReadCloser, indexFile, itemsFile, lensFile fs.ReadAtCloser) (*part, error) {
var errors []error
mrs, err := unmarshalMetaindexRows(nil, metaindexReader)
if err != nil {
@ -109,6 +116,7 @@ func newPart(ph *partHeader, path string, metaindexReader filestream.ReadCloser,
p := &part{
path: path,
size: size,
mrs: mrs,
indexFile: indexFile,

View file

@ -156,7 +156,8 @@ func newTestPart(blocksCount, maxItemsPerBlock int) (*part, []string, error) {
if itemsMerged != uint64(len(items)) {
return nil, nil, fmt.Errorf("unexpected itemsMerged; got %d; want %d", itemsMerged, len(items))
}
p, err := newPart(&ip.ph, "partName", ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData)
size := ip.size()
p, err := newPart(&ip.ph, "partName", size, ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData)
if err != nil {
return nil, nil, fmt.Errorf("cannot create part: %s", err)
}

View file

@ -161,8 +161,8 @@ func OpenTable(path string) (*Table, error) {
var m TableMetrics
tb.UpdateMetrics(&m)
logger.Infof("table %q has been opened in %s; partsCount: %d; blocksCount: %d, itemsCount: %d",
path, time.Since(startTime), m.PartsCount, m.BlocksCount, m.ItemsCount)
logger.Infof("table %q has been opened in %s; partsCount: %d; blocksCount: %d, itemsCount: %d; sizeBytes: %d",
path, time.Since(startTime), m.PartsCount, m.BlocksCount, m.ItemsCount, m.SizeBytes)
return tb, nil
}
@ -242,6 +242,7 @@ type TableMetrics struct {
BlocksCount uint64
ItemsCount uint64
SizeBytes uint64
DataBlocksCacheSize uint64
DataBlocksCacheRequests uint64
@ -274,6 +275,7 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) {
m.BlocksCount += p.ph.blocksCount
m.ItemsCount += p.ph.itemsCount
m.SizeBytes += p.size
m.DataBlocksCacheSize += p.ibCache.Len()
m.DataBlocksCacheRequests += p.ibCache.Requests()
@ -727,6 +729,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP
if err != nil {
return fmt.Errorf("cannot open merged part %q: %s", dstPartPath, err)
}
newPSize := newP.size
newPW := &partWrapper{
p: newP,
refCount: 1,
@ -761,7 +764,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP
d := time.Since(startTime)
if d > 10*time.Second {
logger.Infof("merged %d items in %s at %d items/sec to %q", outItemsCount, d, int(float64(outItemsCount)/d.Seconds()), dstPartPath)
logger.Infof("merged %d items in %s at %d items/sec to %q; bytesSize: %d", outItemsCount, d, int(float64(outItemsCount)/d.Seconds()), dstPartPath, newPSize)
}
return nil

View file

@ -51,7 +51,8 @@ func (mp *inmemoryPart) InitFromRows(rows []rawRow) {
// It is unsafe re-using mp while the returned part is in use.
func (mp *inmemoryPart) NewPart() (*part, error) {
ph := mp.ph
return newPart(&ph, "", mp.metaindexData.NewReader(), &mp.timestampsData, &mp.valuesData, &mp.indexData)
size := uint64(len(mp.timestampsData.B) + len(mp.valuesData.B) + len(mp.indexData.B) + len(mp.metaindexData.B))
return newPart(&ph, "", size, mp.metaindexData.NewReader(), &mp.timestampsData, &mp.valuesData, &mp.indexData)
}
func getInmemoryPart() *inmemoryPart {

View file

@ -36,6 +36,9 @@ type part struct {
// Empty for in-memory part.
path string
// Total size in bytes of part data.
size uint64
timestampsFile fs.ReadAtCloser
valuesFile fs.ReadAtCloser
indexFile fs.ReadAtCloser
@ -59,6 +62,7 @@ func openFilePart(path string) (*part, error) {
if err != nil {
return nil, fmt.Errorf("cannot open timestamps file: %s", err)
}
timestampsSize := fs.MustFileSize(timestampsPath)
valuesPath := path + "/values.bin"
valuesFile, err := fs.OpenReaderAt(valuesPath)
@ -66,6 +70,7 @@ func openFilePart(path string) (*part, error) {
timestampsFile.MustClose()
return nil, fmt.Errorf("cannot open values file: %s", err)
}
valuesSize := fs.MustFileSize(valuesPath)
indexPath := path + "/index.bin"
indexFile, err := fs.OpenReaderAt(indexPath)
@ -74,6 +79,7 @@ func openFilePart(path string) (*part, error) {
valuesFile.MustClose()
return nil, fmt.Errorf("cannot open index file: %s", err)
}
indexSize := fs.MustFileSize(indexPath)
metaindexPath := path + "/metaindex.bin"
metaindexFile, err := filestream.Open(metaindexPath, true)
@ -83,15 +89,17 @@ func openFilePart(path string) (*part, error) {
indexFile.MustClose()
return nil, fmt.Errorf("cannot open metaindex file: %s", err)
}
metaindexSize := fs.MustFileSize(metaindexPath)
return newPart(&ph, path, metaindexFile, timestampsFile, valuesFile, indexFile)
size := timestampsSize + valuesSize + indexSize + metaindexSize
return newPart(&ph, path, size, metaindexFile, timestampsFile, valuesFile, indexFile)
}
// newPart returns new part initialized with the given arguments.
//
// The returned part calls MustClose on all the files passed to newPart
// when calling part.MustClose.
func newPart(ph *partHeader, path string, metaindexReader filestream.ReadCloser, timestampsFile, valuesFile, indexFile fs.ReadAtCloser) (*part, error) {
func newPart(ph *partHeader, path string, size uint64, metaindexReader filestream.ReadCloser, timestampsFile, valuesFile, indexFile fs.ReadAtCloser) (*part, error) {
var errors []error
metaindex, err := unmarshalMetaindexRows(nil, metaindexReader)
if err != nil {
@ -102,6 +110,7 @@ func newPart(ph *partHeader, path string, metaindexReader filestream.ReadCloser,
p := &part{
ph: *ph,
path: path,
size: size,
timestampsFile: timestampsFile,
valuesFile: valuesFile,
indexFile: indexFile,

View file

@ -282,6 +282,9 @@ type partitionMetrics struct {
SmallIndexBlocksCacheRequests uint64
SmallIndexBlocksCacheMisses uint64
BigSizeBytes uint64
SmallSizeBytes uint64
BigRowsCount uint64
SmallRowsCount uint64
@ -326,6 +329,7 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) {
m.BigIndexBlocksCacheMisses += p.ibCache.Misses()
m.BigRowsCount += p.ph.RowsCount
m.BigBlocksCount += p.ph.BlocksCount
m.BigSizeBytes += p.size
m.BigPartsRefCount += atomic.LoadUint64(&pw.refCount)
}
@ -337,6 +341,7 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) {
m.SmallIndexBlocksCacheMisses += p.ibCache.Misses()
m.SmallRowsCount += p.ph.RowsCount
m.SmallBlocksCount += p.ph.BlocksCount
m.SmallSizeBytes += p.size
m.SmallPartsRefCount += atomic.LoadUint64(&pw.refCount)
}
@ -1013,12 +1018,14 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
}
var newPW *partWrapper
var newPSize uint64
if len(dstPartPath) > 0 {
// Open the merged part if it is non-empty.
newP, err := openFilePart(dstPartPath)
if err != nil {
return fmt.Errorf("cannot open merged part %q: %s", dstPartPath, err)
}
newPSize = newP.size
newPW = &partWrapper{
p: newP,
refCount: 1,
@ -1057,7 +1064,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
d := time.Since(startTime)
if d > 10*time.Second {
logger.Infof("merged %d rows in %s at %d rows/sec to %q", outRowsCount, d, int(float64(outRowsCount)/d.Seconds()), dstPartPath)
logger.Infof("merged %d rows in %s at %d rows/sec to %q; sizeBytes: %d", outRowsCount, d, int(float64(outRowsCount)/d.Seconds()), dstPartPath, newPSize)
}
return nil