diff --git a/app/vmselect/main.go b/app/vmselect/main.go index 8a3d174f3..f2157b228 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -8,11 +8,9 @@ import ( "strings" "time" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" @@ -43,9 +41,6 @@ func getDefaultMaxConcurrentRequests() int { // Init initializes vmselect func Init() { - tmpDirPath := *vmstorage.DataPath + "/tmp" - fs.RemoveDirContents(tmpDirPath) - netstorage.InitTmpBlocksDir(tmpDirPath) promql.InitRollupResultCache(*vmstorage.DataPath + "/cache/rollupResult") concurrencyCh = make(chan struct{}, *maxConcurrentRequests) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index ac16b2113..570be1cbb 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -53,9 +53,8 @@ type Results struct { fetchData bool deadline Deadline - tbf *tmpBlocksFile - packedTimeseries []packedTimeseries + sr *storage.Search } // Len returns the number of results in rss. @@ -65,8 +64,12 @@ func (rss *Results) Len() int { // Cancel cancels rss work. func (rss *Results) Cancel() { - putTmpBlocksFile(rss.tbf) - rss.tbf = nil + rss.mustClose() +} + +func (rss *Results) mustClose() { + putStorageSearch(rss.sr) + rss.sr = nil } // RunParallel runs in parallel f for all the results from rss. @@ -76,10 +79,7 @@ func (rss *Results) Cancel() { // // rss becomes unusable after the call to RunParallel. func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error { - defer func() { - putTmpBlocksFile(rss.tbf) - rss.tbf = nil - }() + defer rss.mustClose() workersCount := 1 + len(rss.packedTimeseries)/32 if workersCount > gomaxprocs { @@ -106,7 +106,7 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error { err = fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String()) break } - if err = pts.Unpack(rss.tbf, rs, rss.tr, rss.fetchData, maxWorkersCount); err != nil { + if err = pts.Unpack(rs, rss.tr, rss.fetchData, maxWorkersCount); err != nil { break } if len(rs.Timestamps) == 0 && rss.fetchData { @@ -156,18 +156,18 @@ var gomaxprocs = runtime.GOMAXPROCS(-1) type packedTimeseries struct { metricName string - addrs []tmpBlockAddr + brs []storage.BlockRef } // Unpack unpacks pts to dst. -func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.TimeRange, fetchData bool, maxWorkersCount int) error { +func (pts *packedTimeseries) Unpack(dst *Result, tr storage.TimeRange, fetchData bool, maxWorkersCount int) error { dst.reset() if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil { return fmt.Errorf("cannot unmarshal metricName %q: %s", pts.metricName, err) } - workersCount := 1 + len(pts.addrs)/32 + workersCount := 1 + len(pts.brs)/32 if workersCount > maxWorkersCount { workersCount = maxWorkersCount } @@ -175,19 +175,19 @@ func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage. logger.Panicf("BUG: workersCount cannot be zero") } - sbs := make([]*sortBlock, 0, len(pts.addrs)) + sbs := make([]*sortBlock, 0, len(pts.brs)) var sbsLock sync.Mutex - workCh := make(chan tmpBlockAddr, workersCount) + workCh := make(chan storage.BlockRef, workersCount) doneCh := make(chan error) // Start workers for i := 0; i < workersCount; i++ { go func() { var err error - for addr := range workCh { + for br := range workCh { sb := getSortBlock() - if err = sb.unpackFrom(tbf, addr, tr, fetchData); err != nil { + if err = sb.unpackFrom(br, tr, fetchData); err != nil { break } @@ -204,10 +204,10 @@ func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage. } // Feed workers with work - for _, addr := range pts.addrs { - workCh <- addr + for _, br := range pts.brs { + workCh <- br } - pts.addrs = pts.addrs[:0] + pts.brs = pts.brs[:0] close(workCh) // Wait until workers finish @@ -314,8 +314,8 @@ func (sb *sortBlock) reset() { sb.NextIdx = 0 } -func (sb *sortBlock) unpackFrom(tbf *tmpBlocksFile, addr tmpBlockAddr, tr storage.TimeRange, fetchData bool) error { - tbf.MustReadBlockAt(&sb.b, addr) +func (sb *sortBlock) unpackFrom(br storage.BlockRef, tr storage.TimeRange, fetchData bool) error { + br.MustReadBlock(&sb.b, fetchData) if fetchData { if err := sb.b.UnmarshalData(); err != nil { return fmt.Errorf("cannot unmarshal block: %s", err) @@ -483,6 +483,8 @@ func putStorageSearch(sr *storage.Search) { var ssPool sync.Pool // ProcessSearchQuery performs sq on storage nodes until the given deadline. +// +// Results.RunParallel or Results.Cancel must be called on the returned Results. func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline Deadline) (*Results, error) { // Setup search. tfss, err := setupTfss(sq.TagFilterss) @@ -498,56 +500,40 @@ func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline Deadli defer vmstorage.WG.Done() sr := getStorageSearch() - defer putStorageSearch(sr) - sr.Init(vmstorage.Storage, tfss, tr, fetchData, *maxMetricsPerSearch) + sr.Init(vmstorage.Storage, tfss, tr, *maxMetricsPerSearch) - tbf := getTmpBlocksFile() - m := make(map[string][]tmpBlockAddr) + m := make(map[string][]storage.BlockRef) var orderedMetricNames []string blocksRead := 0 - bb := tmpBufPool.Get() - defer tmpBufPool.Put(bb) for sr.NextMetricBlock() { blocksRead++ - bb.B = storage.MarshalBlock(bb.B[:0], sr.MetricBlock.Block) - addr, err := tbf.WriteBlockData(bb.B) - if err != nil { - putTmpBlocksFile(tbf) - return nil, fmt.Errorf("cannot write data block #%d to temporary blocks file: %s", blocksRead, err) - } if time.Until(deadline.Deadline) < 0 { - putTmpBlocksFile(tbf) return nil, fmt.Errorf("timeout exceeded while fetching data block #%d from storage: %s", blocksRead, deadline.String()) } - metricName := sr.MetricBlock.MetricName - addrs := m[string(metricName)] - if len(addrs) == 0 { + metricName := sr.MetricBlockRef.MetricName + brs := m[string(metricName)] + if len(brs) == 0 { orderedMetricNames = append(orderedMetricNames, string(metricName)) } - m[string(metricName)] = append(addrs, addr) + m[string(metricName)] = append(brs, *sr.MetricBlockRef.BlockRef) } if err := sr.Error(); err != nil { - putTmpBlocksFile(tbf) return nil, fmt.Errorf("search error after reading %d data blocks: %s", blocksRead, err) } - if err := tbf.Finalize(); err != nil { - putTmpBlocksFile(tbf) - return nil, fmt.Errorf("cannot finalize temporary blocks file with %d blocks: %s", blocksRead, err) - } var rss Results rss.tr = tr rss.fetchData = fetchData rss.deadline = deadline - rss.tbf = tbf pts := make([]packedTimeseries, len(orderedMetricNames)) for i, metricName := range orderedMetricNames { pts[i] = packedTimeseries{ metricName: metricName, - addrs: m[metricName], + brs: m[metricName], } } rss.packedTimeseries = pts + rss.sr = sr return &rss, nil } diff --git a/app/vmselect/netstorage/tmp_blocks_file.go b/app/vmselect/netstorage/tmp_blocks_file.go deleted file mode 100644 index 7a278898f..000000000 --- a/app/vmselect/netstorage/tmp_blocks_file.go +++ /dev/null @@ -1,185 +0,0 @@ -package netstorage - -import ( - "fmt" - "io/ioutil" - "os" - "sync" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" - "github.com/VictoriaMetrics/metrics" -) - -// InitTmpBlocksDir initializes directory to store temporary search results. -// -// It stores data in system-defined temporary directory if tmpDirPath is empty. -func InitTmpBlocksDir(tmpDirPath string) { - if len(tmpDirPath) == 0 { - tmpDirPath = os.TempDir() - } - tmpBlocksDir = tmpDirPath + "/searchResults" - fs.MustRemoveAll(tmpBlocksDir) - if err := fs.MkdirAllIfNotExist(tmpBlocksDir); err != nil { - logger.Panicf("FATAL: cannot create %q: %s", tmpBlocksDir, err) - } -} - -var tmpBlocksDir string - -func maxInmemoryTmpBlocksFile() int { - mem := memory.Allowed() - maxLen := mem / 1024 - if maxLen < 64*1024 { - return 64 * 1024 - } - if maxLen > 4*1024*1024 { - return 4 * 1024 * 1024 - } - return maxLen -} - -var _ = metrics.NewGauge(`vm_tmp_blocks_max_inmemory_file_size_bytes`, func() float64 { - return float64(maxInmemoryTmpBlocksFile()) -}) - -type tmpBlocksFile struct { - buf []byte - - f *os.File - r *fs.ReaderAt - - offset uint64 -} - -func getTmpBlocksFile() *tmpBlocksFile { - v := tmpBlocksFilePool.Get() - if v == nil { - return &tmpBlocksFile{ - buf: make([]byte, 0, maxInmemoryTmpBlocksFile()), - } - } - return v.(*tmpBlocksFile) -} - -func putTmpBlocksFile(tbf *tmpBlocksFile) { - tbf.MustClose() - tbf.buf = tbf.buf[:0] - tbf.f = nil - tbf.r = nil - tbf.offset = 0 - tmpBlocksFilePool.Put(tbf) -} - -var tmpBlocksFilePool sync.Pool - -type tmpBlockAddr struct { - offset uint64 - size int -} - -func (addr tmpBlockAddr) String() string { - return fmt.Sprintf("offset %d, size %d", addr.offset, addr.size) -} - -var tmpBlocksFilesCreated = metrics.NewCounter(`vm_tmp_blocks_files_created_total`) - -// WriteBlockData writes b to tbf. -// -// It returns errors since the operation may fail on space shortage -// and this must be handled. -func (tbf *tmpBlocksFile) WriteBlockData(b []byte) (tmpBlockAddr, error) { - var addr tmpBlockAddr - addr.offset = tbf.offset - addr.size = len(b) - tbf.offset += uint64(addr.size) - if len(tbf.buf)+len(b) <= cap(tbf.buf) { - // Fast path - the data fits tbf.buf - tbf.buf = append(tbf.buf, b...) - return addr, nil - } - - // Slow path: flush the data from tbf.buf to file. - if tbf.f == nil { - f, err := ioutil.TempFile(tmpBlocksDir, "") - if err != nil { - return addr, err - } - tbf.f = f - tmpBlocksFilesCreated.Inc() - } - _, err := tbf.f.Write(tbf.buf) - tbf.buf = append(tbf.buf[:0], b...) - if err != nil { - return addr, fmt.Errorf("cannot write block to %q: %s", tbf.f.Name(), err) - } - return addr, nil -} - -func (tbf *tmpBlocksFile) Finalize() error { - if tbf.f == nil { - return nil - } - fname := tbf.f.Name() - if _, err := tbf.f.Write(tbf.buf); err != nil { - return fmt.Errorf("cannot write the remaining %d bytes to %q: %s", len(tbf.buf), fname, err) - } - tbf.buf = tbf.buf[:0] - r, err := fs.OpenReaderAt(fname) - if err != nil { - logger.Panicf("FATAL: cannot open %q: %s", fname, err) - } - // Hint the OS that the file is read almost sequentiallly. - // This should reduce the number of disk seeks, which is important - // for HDDs. - r.MustFadviseSequentialRead(true) - tbf.r = r - return nil -} - -func (tbf *tmpBlocksFile) MustReadBlockAt(dst *storage.Block, addr tmpBlockAddr) { - var buf []byte - if tbf.f == nil { - buf = tbf.buf[addr.offset : addr.offset+uint64(addr.size)] - } else { - bb := tmpBufPool.Get() - defer tmpBufPool.Put(bb) - bb.B = bytesutil.Resize(bb.B, addr.size) - tbf.r.MustReadAt(bb.B, int64(addr.offset)) - buf = bb.B - } - tail, err := storage.UnmarshalBlock(dst, buf) - if err != nil { - logger.Panicf("FATAL: cannot unmarshal data at %s: %s", addr, err) - } - if len(tail) > 0 { - logger.Panicf("FATAL: unexpected non-empty tail left after unmarshaling data at %s; len(tail)=%d", addr, len(tail)) - } -} - -var tmpBufPool bytesutil.ByteBufferPool - -func (tbf *tmpBlocksFile) MustClose() { - if tbf.f == nil { - return - } - if tbf.r != nil { - // tbf.r could be nil if Finalize wasn't called. - tbf.r.MustClose() - } - fname := tbf.f.Name() - - // Remove the file at first, then close it. - // This way the OS shouldn't try to flush file contents to storage - // on close. - if err := os.Remove(fname); err != nil { - logger.Panicf("FATAL: cannot remove %q: %s", fname, err) - } - if err := tbf.f.Close(); err != nil { - logger.Panicf("FATAL: cannot close %q: %s", fname, err) - } - tbf.f = nil -} diff --git a/app/vmselect/netstorage/tmp_blocks_file_test.go b/app/vmselect/netstorage/tmp_blocks_file_test.go deleted file mode 100644 index eba2292a9..000000000 --- a/app/vmselect/netstorage/tmp_blocks_file_test.go +++ /dev/null @@ -1,153 +0,0 @@ -package netstorage - -import ( - "fmt" - "math/rand" - "os" - "reflect" - "testing" - "time" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" -) - -func TestMain(m *testing.M) { - rand.Seed(time.Now().UnixNano()) - tmpDir := "TestTmpBlocks" - InitTmpBlocksDir(tmpDir) - statusCode := m.Run() - if err := os.RemoveAll(tmpDir); err != nil { - logger.Panicf("cannot remove %q: %s", tmpDir, err) - } - os.Exit(statusCode) -} - -func TestTmpBlocksFileSerial(t *testing.T) { - if err := testTmpBlocksFile(); err != nil { - t.Fatalf("unexpected error: %s", err) - } -} - -func TestTmpBlocksFileConcurrent(t *testing.T) { - concurrency := 3 - ch := make(chan error, concurrency) - for i := 0; i < concurrency; i++ { - go func() { - ch <- testTmpBlocksFile() - }() - } - for i := 0; i < concurrency; i++ { - select { - case err := <-ch: - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - case <-time.After(30 * time.Second): - t.Fatalf("timeout") - } - } -} - -func testTmpBlocksFile() error { - createBlock := func() *storage.Block { - rowsCount := rand.Intn(8000) + 1 - var timestamps, values []int64 - ts := int64(rand.Intn(1023434)) - for i := 0; i < rowsCount; i++ { - ts += int64(rand.Intn(1000) + 1) - timestamps = append(timestamps, ts) - values = append(values, int64(i*i+rand.Intn(20))) - } - tsid := &storage.TSID{ - MetricID: 234211, - } - scale := int16(rand.Intn(123)) - precisionBits := uint8(rand.Intn(63) + 1) - var b storage.Block - b.Init(tsid, timestamps, values, scale, precisionBits) - _, _, _ = b.MarshalData(0, 0) - return &b - } - for _, size := range []int{1024, 16 * 1024, maxInmemoryTmpBlocksFile() / 2, 2 * maxInmemoryTmpBlocksFile()} { - err := func() error { - tbf := getTmpBlocksFile() - defer putTmpBlocksFile(tbf) - - // Write blocks until their summary size exceeds `size`. - var addrs []tmpBlockAddr - var blocks []*storage.Block - bb := tmpBufPool.Get() - defer tmpBufPool.Put(bb) - for tbf.offset < uint64(size) { - b := createBlock() - bb.B = storage.MarshalBlock(bb.B[:0], b) - addr, err := tbf.WriteBlockData(bb.B) - if err != nil { - return fmt.Errorf("cannot write block at offset %d: %s", tbf.offset, err) - } - if addr.offset+uint64(addr.size) != tbf.offset { - return fmt.Errorf("unexpected addr=%+v for offset %v", &addr, tbf.offset) - } - addrs = append(addrs, addr) - blocks = append(blocks, b) - } - if err := tbf.Finalize(); err != nil { - return fmt.Errorf("cannot finalize tbf: %s", err) - } - - // Read blocks in parallel and verify them - concurrency := 2 - workCh := make(chan int) - doneCh := make(chan error) - for i := 0; i < concurrency; i++ { - go func() { - doneCh <- func() error { - var b1 storage.Block - for idx := range workCh { - addr := addrs[idx] - b := blocks[idx] - if err := b.UnmarshalData(); err != nil { - return fmt.Errorf("cannot unmarshal data from the original block: %s", err) - } - b1.Reset() - tbf.MustReadBlockAt(&b1, addr) - if err := b1.UnmarshalData(); err != nil { - return fmt.Errorf("cannot unmarshal data from tbf: %s", err) - } - if b1.RowsCount() != b.RowsCount() { - return fmt.Errorf("unexpected number of rows in tbf block; got %d; want %d", b1.RowsCount(), b.RowsCount()) - } - if !reflect.DeepEqual(b1.Timestamps(), b.Timestamps()) { - return fmt.Errorf("unexpected timestamps; got\n%v\nwant\n%v", b1.Timestamps(), b.Timestamps()) - } - if !reflect.DeepEqual(b1.Values(), b.Values()) { - return fmt.Errorf("unexpected values; got\n%v\nwant\n%v", b1.Values(), b.Values()) - } - } - return nil - }() - }() - } - for i := range addrs { - workCh <- i - } - close(workCh) - for i := 0; i < concurrency; i++ { - select { - case err := <-doneCh: - if err != nil { - return err - } - case <-time.After(time.Second): - return fmt.Errorf("timeout") - } - } - return nil - }() - if err != nil { - return err - } - } - return nil -} diff --git a/lib/fs/fadvise_darwin.go b/lib/fs/fadvise_darwin.go deleted file mode 100644 index 73cfe81a7..000000000 --- a/lib/fs/fadvise_darwin.go +++ /dev/null @@ -1,10 +0,0 @@ -package fs - -import ( - "os" -) - -func fadviseSequentialRead(f *os.File, prefetch bool) error { - // TODO: implement this properly - return nil -} diff --git a/lib/fs/fadvise_unix.go b/lib/fs/fadvise_unix.go deleted file mode 100644 index 21018796e..000000000 --- a/lib/fs/fadvise_unix.go +++ /dev/null @@ -1,22 +0,0 @@ -// +build linux freebsd - -package fs - -import ( - "fmt" - "os" - - "golang.org/x/sys/unix" -) - -func fadviseSequentialRead(f *os.File, prefetch bool) error { - fd := int(f.Fd()) - mode := unix.FADV_SEQUENTIAL - if prefetch { - mode |= unix.FADV_WILLNEED - } - if err := unix.Fadvise(int(fd), 0, 0, mode); err != nil { - return fmt.Errorf("error returned from unix.Fadvise(%d): %s", mode, err) - } - return nil -} diff --git a/lib/fs/reader_at.go b/lib/fs/reader_at.go index 26bf4df3f..23ceb91cb 100644 --- a/lib/fs/reader_at.go +++ b/lib/fs/reader_at.go @@ -67,15 +67,6 @@ func (r *ReaderAt) MustClose() { readersCount.Dec() } -// MustFadviseSequentialRead hints the OS that f is read mostly sequentially. -// -// if prefetch is set, then the OS is hinted to prefetch f data. -func (r *ReaderAt) MustFadviseSequentialRead(prefetch bool) { - if err := fadviseSequentialRead(r.f, prefetch); err != nil { - logger.Panicf("FATAL: error in fadviseSequentialRead(%q, %v): %s", r.f.Name(), prefetch, err) - } -} - // OpenReaderAt opens ReaderAt for reading from filename. // // MustClose must be called on the returned ReaderAt when it is no longer needed. @@ -94,7 +85,6 @@ func OpenReaderAt(path string) (*ReaderAt, error) { } r.mmapData = data } - r.MustFadviseSequentialRead(false) readersCount.Inc() return &r, nil } diff --git a/lib/storage/part_search.go b/lib/storage/part_search.go index 03aa7ec7b..7e1d02150 100644 --- a/lib/storage/part_search.go +++ b/lib/storage/part_search.go @@ -15,8 +15,8 @@ import ( // partSearch represents blocks stream for the given search args // passed to Init. type partSearch struct { - // Block contains the found block after NextBlock call. - Block Block + // BlockRef contains the reference to the found block after NextBlock call. + BlockRef BlockRef // p is the part to search. p *part @@ -30,9 +30,6 @@ type partSearch struct { // tr is a time range to search. tr TimeRange - // Skip populating timestampsData and valuesData in Block if fetchData=false. - fetchData bool - metaindex []metaindexRow ibCache *indexBlockCache @@ -49,11 +46,10 @@ type partSearch struct { } func (ps *partSearch) reset() { - ps.Block.Reset() + ps.BlockRef.reset() ps.p = nil ps.tsids = nil ps.tsidIdx = 0 - ps.fetchData = true ps.metaindex = nil ps.ibCache = nil ps.bhs = nil @@ -74,7 +70,7 @@ var isInTest = func() bool { // // tsids must be sorted. // tsids cannot be modified after the Init call, since it is owned by ps. -func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange, fetchData bool) { +func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange) { ps.reset() ps.p = p @@ -86,7 +82,6 @@ func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange, fetchData bool) ps.tsids = tsids } ps.tr = tr - ps.fetchData = fetchData ps.metaindex = p.metaindex ps.ibCache = p.ibCache @@ -95,7 +90,7 @@ func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange, fetchData bool) ps.nextTSID() } -// NextBlock advances to the next Block. +// NextBlock advances to the next BlockRef. // // Returns true on success. // @@ -130,7 +125,7 @@ func (ps *partSearch) nextTSID() bool { ps.err = io.EOF return false } - ps.Block.bh.TSID = ps.tsids[ps.tsidIdx] + ps.BlockRef.bh.TSID = ps.tsids[ps.tsidIdx] ps.tsidIdx++ return true } @@ -139,20 +134,20 @@ func (ps *partSearch) nextBHS() bool { for len(ps.metaindex) > 0 { // Optimization: skip tsid values smaller than the minimum value // from ps.metaindex. - for ps.Block.bh.TSID.Less(&ps.metaindex[0].TSID) { + for ps.BlockRef.bh.TSID.Less(&ps.metaindex[0].TSID) { if !ps.nextTSID() { return false } } - // Invariant: ps.Block.bh.TSID >= ps.metaindex[0].TSID + // Invariant: ps.BlockRef.bh.TSID >= ps.metaindex[0].TSID - ps.metaindex = skipSmallMetaindexRows(ps.metaindex, &ps.Block.bh.TSID) - // Invariant: len(ps.metaindex) > 0 && ps.Block.bh.TSID >= ps.metaindex[0].TSID + ps.metaindex = skipSmallMetaindexRows(ps.metaindex, &ps.BlockRef.bh.TSID) + // Invariant: len(ps.metaindex) > 0 && ps.BlockRef.bh.TSID >= ps.metaindex[0].TSID mr := &ps.metaindex[0] ps.metaindex = ps.metaindex[1:] - if ps.Block.bh.TSID.Less(&mr.TSID) { - logger.Panicf("BUG: invariant violation: ps.Block.bh.TSID cannot be smaller than mr.TSID; got %+v vs %+v", &ps.Block.bh.TSID, &mr.TSID) + if ps.BlockRef.bh.TSID.Less(&mr.TSID) { + logger.Panicf("BUG: invariant violation: ps.BlockRef.bh.TSID cannot be smaller than mr.TSID; got %+v vs %+v", &ps.BlockRef.bh.TSID, &mr.TSID) } if mr.MaxTimestamp < ps.tr.MinTimestamp { @@ -165,7 +160,7 @@ func (ps *partSearch) nextBHS() bool { } // Found the index block which may contain the required data - // for the ps.Block.bh.TSID and the given timestamp range. + // for the ps.BlockRef.bh.TSID and the given timestamp range. if ps.indexBlockReuse != nil { putIndexBlock(ps.indexBlockReuse) ps.indexBlockReuse = nil @@ -249,15 +244,15 @@ func (ps *partSearch) searchBHS() bool { bh := &ps.bhs[i] nextTSID: - if bh.TSID.Less(&ps.Block.bh.TSID) { + if bh.TSID.Less(&ps.BlockRef.bh.TSID) { // Skip blocks with small tsid values. continue } - // Invariant: ps.Block.bh.TSID <= bh.TSID + // Invariant: ps.BlockRef.bh.TSID <= bh.TSID - if bh.TSID.MetricID != ps.Block.bh.TSID.MetricID { - // ps.Block.bh.TSID < bh.TSID: no more blocks with the given tsid. + if bh.TSID.MetricID != ps.BlockRef.bh.TSID.MetricID { + // ps.BlockRef.bh.TSID < bh.TSID: no more blocks with the given tsid. // Proceed to the next (bigger) tsid. if !ps.nextTSID() { return false @@ -284,7 +279,7 @@ func (ps *partSearch) searchBHS() bool { // Found the tsid block with the matching timestamp range. // Read it. - ps.readBlock(bh) + ps.BlockRef.init(ps.p, bh) ps.bhs = ps.bhs[i+1:] return true @@ -293,17 +288,3 @@ func (ps *partSearch) searchBHS() bool { ps.bhs = nil return false } - -func (ps *partSearch) readBlock(bh *blockHeader) { - ps.Block.Reset() - ps.Block.bh = *bh - if !ps.fetchData { - return - } - - ps.Block.timestampsData = bytesutil.Resize(ps.Block.timestampsData[:0], int(bh.TimestampsBlockSize)) - ps.p.timestampsFile.MustReadAt(ps.Block.timestampsData, int64(bh.TimestampsBlockOffset)) - - ps.Block.valuesData = bytesutil.Resize(ps.Block.valuesData[:0], int(bh.ValuesBlockSize)) - ps.p.valuesFile.MustReadAt(ps.Block.valuesData, int64(bh.ValuesBlockOffset)) -} diff --git a/lib/storage/part_search_test.go b/lib/storage/part_search_test.go index 11166debf..d06d2ac01 100644 --- a/lib/storage/part_search_test.go +++ b/lib/storage/part_search_test.go @@ -1247,11 +1247,11 @@ func testPartSearch(t *testing.T, p *part, tsids []TSID, tr TimeRange, expectedR func testPartSearchSerial(p *part, tsids []TSID, tr TimeRange, expectedRawBlocks []rawBlock) error { var ps partSearch - ps.Init(p, tsids, tr, true) + ps.Init(p, tsids, tr) var bs []Block for ps.NextBlock() { var b Block - b.CopyFrom(&ps.Block) + ps.BlockRef.MustReadBlock(&b, true) bs = append(bs, b) } if err := ps.Error(); err != nil { diff --git a/lib/storage/partition_search.go b/lib/storage/partition_search.go index 3e2ed4020..b7ceaa015 100644 --- a/lib/storage/partition_search.go +++ b/lib/storage/partition_search.go @@ -10,8 +10,8 @@ import ( // partitionSearch represents a search in the partition. type partitionSearch struct { - // Block is the block found after NextBlock call. - Block *Block + // BlockRef is the block found after NextBlock call. + BlockRef *BlockRef // pt is a partition to search. pt *partition @@ -30,7 +30,7 @@ type partitionSearch struct { } func (pts *partitionSearch) reset() { - pts.Block = nil + pts.BlockRef = nil pts.pt = nil for i := range pts.pws { @@ -59,7 +59,7 @@ func (pts *partitionSearch) reset() { // tsids cannot be modified after the Init call, since it is owned by pts. // /// MustClose must be called when partition search is done. -func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange, fetchData bool) { +func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange) { if pts.needClosing { logger.Panicf("BUG: missing partitionSearch.MustClose call before the next call to Init") } @@ -88,7 +88,7 @@ func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange, fetc } pts.psPool = pts.psPool[:len(pts.pws)] for i, pw := range pts.pws { - pts.psPool[i].Init(pw.p, tsids, tr, fetchData) + pts.psPool[i].Init(pw.p, tsids, tr) } // Initialize the psHeap. @@ -114,7 +114,7 @@ func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange, fetc return } heap.Init(&pts.psHeap) - pts.Block = &pts.psHeap[0].Block + pts.BlockRef = &pts.psHeap[0].BlockRef pts.nextBlockNoop = true } @@ -145,7 +145,7 @@ func (pts *partitionSearch) nextBlock() error { psMin := pts.psHeap[0] if psMin.NextBlock() { heap.Fix(&pts.psHeap, 0) - pts.Block = &pts.psHeap[0].Block + pts.BlockRef = &pts.psHeap[0].BlockRef return nil } @@ -159,7 +159,7 @@ func (pts *partitionSearch) nextBlock() error { return io.EOF } - pts.Block = &pts.psHeap[0].Block + pts.BlockRef = &pts.psHeap[0].BlockRef return nil } @@ -188,7 +188,7 @@ func (psh *partSearchHeap) Len() int { func (psh *partSearchHeap) Less(i, j int) bool { x := *psh - return x[i].Block.bh.Less(&x[j].Block.bh) + return x[i].BlockRef.bh.Less(&x[j].BlockRef.bh) } func (psh *partSearchHeap) Swap(i, j int) { diff --git a/lib/storage/partition_search_test.go b/lib/storage/partition_search_test.go index 7f8b515b8..fd997600e 100644 --- a/lib/storage/partition_search_test.go +++ b/lib/storage/partition_search_test.go @@ -240,10 +240,10 @@ func testPartitionSearchSerial(pt *partition, tsids []TSID, tr TimeRange, rbsExp bs := []Block{} var pts partitionSearch - pts.Init(pt, tsids, tr, true) + pts.Init(pt, tsids, tr) for pts.NextBlock() { var b Block - b.CopyFrom(pts.Block) + pts.BlockRef.MustReadBlock(&b, true) bs = append(bs, b) } if err := pts.Error(); err != nil { @@ -265,18 +265,9 @@ func testPartitionSearchSerial(pt *partition, tsids []TSID, tr TimeRange, rbsExp } // verify that empty tsids returns empty result - pts.Init(pt, []TSID{}, tr, true) + pts.Init(pt, []TSID{}, tr) if pts.NextBlock() { - return fmt.Errorf("unexpected block got for an empty tsids list: %+v", pts.Block) - } - if err := pts.Error(); err != nil { - return fmt.Errorf("unexpected error on empty tsids list: %s", err) - } - pts.MustClose() - - pts.Init(pt, []TSID{}, tr, false) - if pts.NextBlock() { - return fmt.Errorf("unexpected block got for an empty tsids list: %+v", pts.Block) + return fmt.Errorf("unexpected block got for an empty tsids list: %+v", pts.BlockRef) } if err := pts.Error(); err != nil { return fmt.Errorf("unexpected error on empty tsids list: %s", err) diff --git a/lib/storage/search.go b/lib/storage/search.go index 831818686..500608b8f 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -9,77 +9,55 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) -// MetricBlock is a time series block for a single metric. -type MetricBlock struct { +// BlockRef references a Block. +// +// BlockRef is valid only until the corresponding Search is valid, +// i.e. it becomes invalid after Search.MustClose is called. +type BlockRef struct { + p *part + bh blockHeader +} + +func (br *BlockRef) reset() { + br.p = nil + br.bh = blockHeader{} +} + +func (br *BlockRef) init(p *part, bh *blockHeader) { + br.p = p + br.bh = *bh +} + +// MustReadBlock reads block from br to dst. +// +// if fetchData is false, then only block header is read, otherwise all the data is read. +func (br *BlockRef) MustReadBlock(dst *Block, fetchData bool) { + dst.Reset() + dst.bh = br.bh + if !fetchData { + return + } + + dst.timestampsData = bytesutil.Resize(dst.timestampsData[:0], int(br.bh.TimestampsBlockSize)) + br.p.timestampsFile.MustReadAt(dst.timestampsData, int64(br.bh.TimestampsBlockOffset)) + + dst.valuesData = bytesutil.Resize(dst.valuesData[:0], int(br.bh.ValuesBlockSize)) + br.p.valuesFile.MustReadAt(dst.valuesData, int64(br.bh.ValuesBlockOffset)) +} + +// MetricBlockRef contains reference to time series block for a single metric. +type MetricBlockRef struct { + // The metric name MetricName []byte - Block *Block -} - -// Marshal marshals MetricBlock to dst -func (mb *MetricBlock) Marshal(dst []byte) []byte { - dst = encoding.MarshalBytes(dst, mb.MetricName) - return MarshalBlock(dst, mb.Block) -} - -// MarshalBlock marshals b to dst. -// -// b.MarshalData must be called on b before calling MarshalBlock. -func MarshalBlock(dst []byte, b *Block) []byte { - dst = b.bh.Marshal(dst) - dst = encoding.MarshalBytes(dst, b.timestampsData) - dst = encoding.MarshalBytes(dst, b.valuesData) - return dst -} - -// Unmarshal unmarshals MetricBlock from src -func (mb *MetricBlock) Unmarshal(src []byte) ([]byte, error) { - if mb.Block == nil { - logger.Panicf("BUG: MetricBlock.Block must be non-nil when calling Unmarshal!") - } else { - mb.Block.Reset() - } - tail, mn, err := encoding.UnmarshalBytes(src) - if err != nil { - return tail, fmt.Errorf("cannot unmarshal MetricName: %s", err) - } - mb.MetricName = append(mb.MetricName[:0], mn...) - src = tail - - return UnmarshalBlock(mb.Block, src) -} - -// UnmarshalBlock unmarshal Block from src to dst. -// -// dst.UnmarshalData isn't called on the block. -func UnmarshalBlock(dst *Block, src []byte) ([]byte, error) { - tail, err := dst.bh.Unmarshal(src) - if err != nil { - return tail, fmt.Errorf("cannot unmarshal blockHeader: %s", err) - } - src = tail - - tail, tds, err := encoding.UnmarshalBytes(src) - if err != nil { - return tail, fmt.Errorf("cannot unmarshal timestampsData: %s", err) - } - dst.timestampsData = append(dst.timestampsData[:0], tds...) - src = tail - - tail, vd, err := encoding.UnmarshalBytes(src) - if err != nil { - return tail, fmt.Errorf("cannot unmarshal valuesData: %s", err) - } - dst.valuesData = append(dst.valuesData[:0], vd...) - src = tail - - return src, nil + // The block reference. Call BlockRef.MustReadBlock in order to obtain the block. + BlockRef *BlockRef } // Search is a search for time series. type Search struct { - // MetricBlock is updated with each Search.NextMetricBlock call. - MetricBlock MetricBlock + // MetricBlockRef is updated with each Search.NextMetricBlock call. + MetricBlockRef MetricBlockRef storage *Storage @@ -91,8 +69,8 @@ type Search struct { } func (s *Search) reset() { - s.MetricBlock.MetricName = s.MetricBlock.MetricName[:0] - s.MetricBlock.Block = nil + s.MetricBlockRef.MetricName = s.MetricBlockRef.MetricName[:0] + s.MetricBlockRef.BlockRef = nil s.storage = nil s.ts.reset() @@ -103,7 +81,7 @@ func (s *Search) reset() { // Init initializes s from the given storage, tfss and tr. // // MustClose must be called when the search is done. -func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, fetchData bool, maxMetrics int) { +func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, maxMetrics int) { if s.needClosing { logger.Panicf("BUG: missing MustClose call before the next call to Init") } @@ -118,7 +96,7 @@ func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, fetchD // It is ok to call Init on error from storage.searchTSIDs. // Init must be called before returning because it will fail // on Seach.MustClose otherwise. - s.ts.Init(storage.tb, tsids, tr, fetchData) + s.ts.Init(storage.tb, tsids, tr) if err != nil { s.err = err @@ -145,15 +123,15 @@ func (s *Search) Error() error { return s.err } -// NextMetricBlock proceeds to the next MetricBlock. +// NextMetricBlock proceeds to the next MetricBlockRef. func (s *Search) NextMetricBlock() bool { if s.err != nil { return false } for s.ts.NextBlock() { - tsid := &s.ts.Block.bh.TSID + tsid := &s.ts.BlockRef.bh.TSID var err error - s.MetricBlock.MetricName, err = s.storage.searchMetricName(s.MetricBlock.MetricName[:0], tsid.MetricID) + s.MetricBlockRef.MetricName, err = s.storage.searchMetricName(s.MetricBlockRef.MetricName[:0], tsid.MetricID) if err != nil { if err == io.EOF { // Skip missing metricName for tsid.MetricID. @@ -163,7 +141,7 @@ func (s *Search) NextMetricBlock() bool { s.err = err return false } - s.MetricBlock.Block = s.ts.Block + s.MetricBlockRef.BlockRef = s.ts.BlockRef return true } if err := s.ts.Error(); err != nil { diff --git a/lib/storage/search_test.go b/lib/storage/search_test.go index 6500907cc..d5f6feb2e 100644 --- a/lib/storage/search_test.go +++ b/lib/storage/search_test.go @@ -208,15 +208,20 @@ func testSearchInternal(st *Storage, tr TimeRange, mrs []MetricRow, accountsCoun expectedMrs = append(expectedMrs, *mr) } + type metricBlock struct { + MetricName []byte + Block *Block + } + // Search - s.Init(st, []*TagFilters{tfs}, tr, true, 1e5) - var mbs []MetricBlock + s.Init(st, []*TagFilters{tfs}, tr, 1e5) + var mbs []metricBlock for s.NextMetricBlock() { var b Block - b.CopyFrom(s.MetricBlock.Block) + s.MetricBlockRef.BlockRef.MustReadBlock(&b, true) - var mb MetricBlock - mb.MetricName = append(mb.MetricName, s.MetricBlock.MetricName...) + var mb metricBlock + mb.MetricName = append(mb.MetricName, s.MetricBlockRef.MetricName...) mb.Block = &b mbs = append(mbs, mb) } diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 75ecacb23..8cd44faa1 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -583,24 +583,13 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error { MaxTimestamp: 2e10, } metricBlocksCount := func(tfs *TagFilters) int { - // Verify the number of blocks with fetchData=true + // Verify the number of blocks n := 0 - sr.Init(s, []*TagFilters{tfs}, tr, true, 1e5) + sr.Init(s, []*TagFilters{tfs}, tr, 1e5) for sr.NextMetricBlock() { n++ } sr.MustClose() - - // Make sure the number of blocks with fetchData=false is the same. - m := 0 - sr.Init(s, []*TagFilters{tfs}, tr, false, 1e5) - for sr.NextMetricBlock() { - m++ - } - sr.MustClose() - if n != m { - return -1 - } return n } for i := 0; i < metricsCount; i++ { diff --git a/lib/storage/table_search.go b/lib/storage/table_search.go index 1f4f4ca56..b82ff9011 100644 --- a/lib/storage/table_search.go +++ b/lib/storage/table_search.go @@ -11,7 +11,7 @@ import ( // tableSearch performs searches in the table. type tableSearch struct { - Block *Block + BlockRef *BlockRef tb *table @@ -29,7 +29,7 @@ type tableSearch struct { } func (ts *tableSearch) reset() { - ts.Block = nil + ts.BlockRef = nil ts.tb = nil for i := range ts.ptws { @@ -58,7 +58,7 @@ func (ts *tableSearch) reset() { // tsids cannot be modified after the Init call, since it is owned by ts. // // MustClose must be called then the tableSearch is done. -func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange, fetchData bool) { +func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange) { if ts.needClosing { logger.Panicf("BUG: missing MustClose call before the next call to Init") } @@ -89,7 +89,7 @@ func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange, fetchData boo } ts.ptsPool = ts.ptsPool[:len(ts.ptws)] for i, ptw := range ts.ptws { - ts.ptsPool[i].Init(ptw.pt, tsids, tr, fetchData) + ts.ptsPool[i].Init(ptw.pt, tsids, tr) } // Initialize the ptsHeap. @@ -115,13 +115,13 @@ func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange, fetchData boo return } heap.Init(&ts.ptsHeap) - ts.Block = ts.ptsHeap[0].Block + ts.BlockRef = ts.ptsHeap[0].BlockRef ts.nextBlockNoop = true } // NextBlock advances to the next block. // -// The blocks are sorted by (TDIS, MinTimestamp). Two subsequent blocks +// The blocks are sorted by (TSID, MinTimestamp). Two subsequent blocks // for the same TSID may contain overlapped time ranges. func (ts *tableSearch) NextBlock() bool { if ts.err != nil { @@ -146,7 +146,7 @@ func (ts *tableSearch) nextBlock() error { ptsMin := ts.ptsHeap[0] if ptsMin.NextBlock() { heap.Fix(&ts.ptsHeap, 0) - ts.Block = ts.ptsHeap[0].Block + ts.BlockRef = ts.ptsHeap[0].BlockRef return nil } @@ -160,7 +160,7 @@ func (ts *tableSearch) nextBlock() error { return io.EOF } - ts.Block = ts.ptsHeap[0].Block + ts.BlockRef = ts.ptsHeap[0].BlockRef return nil } @@ -192,7 +192,7 @@ func (ptsh *partitionSearchHeap) Len() int { func (ptsh *partitionSearchHeap) Less(i, j int) bool { x := *ptsh - return x[i].Block.bh.Less(&x[j].Block.bh) + return x[i].BlockRef.bh.Less(&x[j].BlockRef.bh) } func (ptsh *partitionSearchHeap) Swap(i, j int) { diff --git a/lib/storage/table_search_test.go b/lib/storage/table_search_test.go index 6c75a1a08..c821b5e93 100644 --- a/lib/storage/table_search_test.go +++ b/lib/storage/table_search_test.go @@ -251,10 +251,10 @@ func testTableSearchSerial(tb *table, tsids []TSID, tr TimeRange, rbsExpected [] bs := []Block{} var ts tableSearch - ts.Init(tb, tsids, tr, true) + ts.Init(tb, tsids, tr) for ts.NextBlock() { var b Block - b.CopyFrom(ts.Block) + ts.BlockRef.MustReadBlock(&b, true) bs = append(bs, b) } if err := ts.Error(); err != nil { @@ -276,23 +276,14 @@ func testTableSearchSerial(tb *table, tsids []TSID, tr TimeRange, rbsExpected [] } // verify that empty tsids returns empty result - ts.Init(tb, []TSID{}, tr, true) + ts.Init(tb, []TSID{}, tr) if ts.NextBlock() { - return fmt.Errorf("unexpected block got for an empty tsids list: %+v", ts.Block) + return fmt.Errorf("unexpected block got for an empty tsids list: %+v", ts.BlockRef) } if err := ts.Error(); err != nil { return fmt.Errorf("unexpected error on empty tsids list: %s", err) } ts.MustClose() - ts.Init(tb, []TSID{}, tr, false) - if ts.NextBlock() { - return fmt.Errorf("unexpected block got for an empty tsids list with fetchData=false: %+v", ts.Block) - } - if err := ts.Error(); err != nil { - return fmt.Errorf("unexpected error on empty tsids list with fetchData=false: %s", err) - } - ts.MustClose() - return nil } diff --git a/lib/storage/table_search_timing_test.go b/lib/storage/table_search_timing_test.go index df9af44f4..fe4d8c8df 100644 --- a/lib/storage/table_search_timing_test.go +++ b/lib/storage/table_search_timing_test.go @@ -127,12 +127,14 @@ func benchmarkTableSearch(b *testing.B, rowsCount, tsidsCount, tsidsSearch int, b.RunParallel(func(pb *testing.PB) { var ts tableSearch tsids := make([]TSID, tsidsSearch) + var tmpBlock Block for pb.Next() { for i := range tsids { tsids[i].MetricID = 1 + uint64(i) } - ts.Init(tb, tsids, tr, fetchData) + ts.Init(tb, tsids, tr) for ts.NextBlock() { + ts.BlockRef.MustReadBlock(&tmpBlock, fetchData) } ts.MustClose() }