mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: remove interface conversion in hot path during block merging
This should improve merge speed a bit for parts with big number of small blocks.
This commit is contained in:
parent
26ffc77622
commit
386c349c8c
2 changed files with 38 additions and 11 deletions
|
@ -25,9 +25,13 @@ type blockStreamReader struct {
|
||||||
|
|
||||||
ph partHeader
|
ph partHeader
|
||||||
|
|
||||||
timestampsReader filestream.ReadCloser
|
// Use io.Reader type for timestampsReader and valuesReader
|
||||||
valuesReader filestream.ReadCloser
|
// in order to remove I2I conversion in readBlock
|
||||||
indexReader filestream.ReadCloser
|
// when passing them to fs.ReadFullData
|
||||||
|
timestampsReader io.Reader
|
||||||
|
valuesReader io.Reader
|
||||||
|
|
||||||
|
indexReader filestream.ReadCloser
|
||||||
|
|
||||||
mrs []metaindexRow
|
mrs []metaindexRow
|
||||||
|
|
||||||
|
@ -56,6 +60,11 @@ type blockStreamReader struct {
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (bsr *blockStreamReader) assertWriteClosers() {
|
||||||
|
_ = bsr.timestampsReader.(filestream.ReadCloser)
|
||||||
|
_ = bsr.valuesReader.(filestream.ReadCloser)
|
||||||
|
}
|
||||||
|
|
||||||
func (bsr *blockStreamReader) reset() {
|
func (bsr *blockStreamReader) reset() {
|
||||||
bsr.Block.Reset()
|
bsr.Block.Reset()
|
||||||
|
|
||||||
|
@ -108,6 +117,8 @@ func (bsr *blockStreamReader) InitFromInmemoryPart(mp *inmemoryPart) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Panicf("BUG: cannot unmarshal metaindex rows from inmemoryPart: %s", err)
|
logger.Panicf("BUG: cannot unmarshal metaindex rows from inmemoryPart: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bsr.assertWriteClosers()
|
||||||
}
|
}
|
||||||
|
|
||||||
// InitFromFilePart initializes bsr from a file-based part on the given path.
|
// InitFromFilePart initializes bsr from a file-based part on the given path.
|
||||||
|
@ -167,6 +178,8 @@ func (bsr *blockStreamReader) InitFromFilePart(path string) error {
|
||||||
bsr.indexReader = indexFile
|
bsr.indexReader = indexFile
|
||||||
bsr.mrs = mrs
|
bsr.mrs = mrs
|
||||||
|
|
||||||
|
bsr.assertWriteClosers()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -174,8 +187,8 @@ func (bsr *blockStreamReader) InitFromFilePart(path string) error {
|
||||||
//
|
//
|
||||||
// It closes *Reader files passed to Init.
|
// It closes *Reader files passed to Init.
|
||||||
func (bsr *blockStreamReader) MustClose() {
|
func (bsr *blockStreamReader) MustClose() {
|
||||||
bsr.timestampsReader.MustClose()
|
bsr.timestampsReader.(filestream.ReadCloser).MustClose()
|
||||||
bsr.valuesReader.MustClose()
|
bsr.valuesReader.(filestream.ReadCloser).MustClose()
|
||||||
bsr.indexReader.MustClose()
|
bsr.indexReader.MustClose()
|
||||||
|
|
||||||
bsr.reset()
|
bsr.reset()
|
||||||
|
|
|
@ -2,6 +2,7 @@ package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
@ -17,10 +18,14 @@ type blockStreamWriter struct {
|
||||||
compressLevel int
|
compressLevel int
|
||||||
path string
|
path string
|
||||||
|
|
||||||
timestampsWriter filestream.WriteCloser
|
// Use io.Writer type for timestampsWriter and valuesWriter
|
||||||
valuesWriter filestream.WriteCloser
|
// in order to remove I2I conversion in WriteExternalBlock
|
||||||
indexWriter filestream.WriteCloser
|
// when passing them to fs.MustWriteData
|
||||||
metaindexWriter filestream.WriteCloser
|
timestampsWriter io.Writer
|
||||||
|
valuesWriter io.Writer
|
||||||
|
|
||||||
|
indexWriter filestream.WriteCloser
|
||||||
|
metaindexWriter filestream.WriteCloser
|
||||||
|
|
||||||
mr metaindexRow
|
mr metaindexRow
|
||||||
|
|
||||||
|
@ -35,6 +40,11 @@ type blockStreamWriter struct {
|
||||||
compressedMetaindexData []byte
|
compressedMetaindexData []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (bsw *blockStreamWriter) assertWriteClosers() {
|
||||||
|
_ = bsw.timestampsWriter.(filestream.WriteCloser)
|
||||||
|
_ = bsw.valuesWriter.(filestream.WriteCloser)
|
||||||
|
}
|
||||||
|
|
||||||
// Init initializes bsw with the given writers.
|
// Init initializes bsw with the given writers.
|
||||||
func (bsw *blockStreamWriter) reset() {
|
func (bsw *blockStreamWriter) reset() {
|
||||||
bsw.compressLevel = 0
|
bsw.compressLevel = 0
|
||||||
|
@ -67,6 +77,8 @@ func (bsw *blockStreamWriter) InitFromInmemoryPart(mp *inmemoryPart) {
|
||||||
bsw.valuesWriter = &mp.valuesData
|
bsw.valuesWriter = &mp.valuesData
|
||||||
bsw.indexWriter = &mp.indexData
|
bsw.indexWriter = &mp.indexData
|
||||||
bsw.metaindexWriter = &mp.metaindexData
|
bsw.metaindexWriter = &mp.metaindexData
|
||||||
|
|
||||||
|
bsw.assertWriteClosers()
|
||||||
}
|
}
|
||||||
|
|
||||||
// InitFromFilePart initializes bsw from a file-based part on the given path.
|
// InitFromFilePart initializes bsw from a file-based part on the given path.
|
||||||
|
@ -126,6 +138,8 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
|
||||||
bsw.indexWriter = indexFile
|
bsw.indexWriter = indexFile
|
||||||
bsw.metaindexWriter = metaindexFile
|
bsw.metaindexWriter = metaindexFile
|
||||||
|
|
||||||
|
bsw.assertWriteClosers()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,8 +155,8 @@ func (bsw *blockStreamWriter) MustClose() {
|
||||||
fs.MustWriteData(bsw.metaindexWriter, bsw.compressedMetaindexData)
|
fs.MustWriteData(bsw.metaindexWriter, bsw.compressedMetaindexData)
|
||||||
|
|
||||||
// Close writers.
|
// Close writers.
|
||||||
bsw.timestampsWriter.MustClose()
|
bsw.timestampsWriter.(filestream.WriteCloser).MustClose()
|
||||||
bsw.valuesWriter.MustClose()
|
bsw.valuesWriter.(filestream.WriteCloser).MustClose()
|
||||||
bsw.indexWriter.MustClose()
|
bsw.indexWriter.MustClose()
|
||||||
bsw.metaindexWriter.MustClose()
|
bsw.metaindexWriter.MustClose()
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue