mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-09 15:27:11 +00:00
lib/storage: postpone reading data from blocks during search
This eliminates the need for storing block data into temporary files on a single-node VictoriaMetrics during heavy queries, which touch big number of time series over long time ranges. This improves single-node VM performance on heavy queries by up to 2x.
This commit is contained in:
parent
0224071ebe
commit
b4afe562c1
17 changed files with 143 additions and 605 deletions
|
@ -8,11 +8,9 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/prometheus"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/prometheus"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||||
|
@ -43,9 +41,6 @@ func getDefaultMaxConcurrentRequests() int {
|
||||||
|
|
||||||
// Init initializes vmselect
|
// Init initializes vmselect
|
||||||
func Init() {
|
func Init() {
|
||||||
tmpDirPath := *vmstorage.DataPath + "/tmp"
|
|
||||||
fs.RemoveDirContents(tmpDirPath)
|
|
||||||
netstorage.InitTmpBlocksDir(tmpDirPath)
|
|
||||||
promql.InitRollupResultCache(*vmstorage.DataPath + "/cache/rollupResult")
|
promql.InitRollupResultCache(*vmstorage.DataPath + "/cache/rollupResult")
|
||||||
|
|
||||||
concurrencyCh = make(chan struct{}, *maxConcurrentRequests)
|
concurrencyCh = make(chan struct{}, *maxConcurrentRequests)
|
||||||
|
|
|
@ -53,9 +53,8 @@ type Results struct {
|
||||||
fetchData bool
|
fetchData bool
|
||||||
deadline Deadline
|
deadline Deadline
|
||||||
|
|
||||||
tbf *tmpBlocksFile
|
|
||||||
|
|
||||||
packedTimeseries []packedTimeseries
|
packedTimeseries []packedTimeseries
|
||||||
|
sr *storage.Search
|
||||||
}
|
}
|
||||||
|
|
||||||
// Len returns the number of results in rss.
|
// Len returns the number of results in rss.
|
||||||
|
@ -65,8 +64,12 @@ func (rss *Results) Len() int {
|
||||||
|
|
||||||
// Cancel cancels rss work.
|
// Cancel cancels rss work.
|
||||||
func (rss *Results) Cancel() {
|
func (rss *Results) Cancel() {
|
||||||
putTmpBlocksFile(rss.tbf)
|
rss.mustClose()
|
||||||
rss.tbf = nil
|
}
|
||||||
|
|
||||||
|
func (rss *Results) mustClose() {
|
||||||
|
putStorageSearch(rss.sr)
|
||||||
|
rss.sr = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunParallel runs in parallel f for all the results from rss.
|
// RunParallel runs in parallel f for all the results from rss.
|
||||||
|
@ -76,10 +79,7 @@ func (rss *Results) Cancel() {
|
||||||
//
|
//
|
||||||
// rss becomes unusable after the call to RunParallel.
|
// rss becomes unusable after the call to RunParallel.
|
||||||
func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error {
|
func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error {
|
||||||
defer func() {
|
defer rss.mustClose()
|
||||||
putTmpBlocksFile(rss.tbf)
|
|
||||||
rss.tbf = nil
|
|
||||||
}()
|
|
||||||
|
|
||||||
workersCount := 1 + len(rss.packedTimeseries)/32
|
workersCount := 1 + len(rss.packedTimeseries)/32
|
||||||
if workersCount > gomaxprocs {
|
if workersCount > gomaxprocs {
|
||||||
|
@ -106,7 +106,7 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error {
|
||||||
err = fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String())
|
err = fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String())
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if err = pts.Unpack(rss.tbf, rs, rss.tr, rss.fetchData, maxWorkersCount); err != nil {
|
if err = pts.Unpack(rs, rss.tr, rss.fetchData, maxWorkersCount); err != nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if len(rs.Timestamps) == 0 && rss.fetchData {
|
if len(rs.Timestamps) == 0 && rss.fetchData {
|
||||||
|
@ -156,18 +156,18 @@ var gomaxprocs = runtime.GOMAXPROCS(-1)
|
||||||
|
|
||||||
type packedTimeseries struct {
|
type packedTimeseries struct {
|
||||||
metricName string
|
metricName string
|
||||||
addrs []tmpBlockAddr
|
brs []storage.BlockRef
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unpack unpacks pts to dst.
|
// Unpack unpacks pts to dst.
|
||||||
func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.TimeRange, fetchData bool, maxWorkersCount int) error {
|
func (pts *packedTimeseries) Unpack(dst *Result, tr storage.TimeRange, fetchData bool, maxWorkersCount int) error {
|
||||||
dst.reset()
|
dst.reset()
|
||||||
|
|
||||||
if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil {
|
if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil {
|
||||||
return fmt.Errorf("cannot unmarshal metricName %q: %s", pts.metricName, err)
|
return fmt.Errorf("cannot unmarshal metricName %q: %s", pts.metricName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
workersCount := 1 + len(pts.addrs)/32
|
workersCount := 1 + len(pts.brs)/32
|
||||||
if workersCount > maxWorkersCount {
|
if workersCount > maxWorkersCount {
|
||||||
workersCount = maxWorkersCount
|
workersCount = maxWorkersCount
|
||||||
}
|
}
|
||||||
|
@ -175,19 +175,19 @@ func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.
|
||||||
logger.Panicf("BUG: workersCount cannot be zero")
|
logger.Panicf("BUG: workersCount cannot be zero")
|
||||||
}
|
}
|
||||||
|
|
||||||
sbs := make([]*sortBlock, 0, len(pts.addrs))
|
sbs := make([]*sortBlock, 0, len(pts.brs))
|
||||||
var sbsLock sync.Mutex
|
var sbsLock sync.Mutex
|
||||||
|
|
||||||
workCh := make(chan tmpBlockAddr, workersCount)
|
workCh := make(chan storage.BlockRef, workersCount)
|
||||||
doneCh := make(chan error)
|
doneCh := make(chan error)
|
||||||
|
|
||||||
// Start workers
|
// Start workers
|
||||||
for i := 0; i < workersCount; i++ {
|
for i := 0; i < workersCount; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
var err error
|
var err error
|
||||||
for addr := range workCh {
|
for br := range workCh {
|
||||||
sb := getSortBlock()
|
sb := getSortBlock()
|
||||||
if err = sb.unpackFrom(tbf, addr, tr, fetchData); err != nil {
|
if err = sb.unpackFrom(br, tr, fetchData); err != nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -204,10 +204,10 @@ func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Feed workers with work
|
// Feed workers with work
|
||||||
for _, addr := range pts.addrs {
|
for _, br := range pts.brs {
|
||||||
workCh <- addr
|
workCh <- br
|
||||||
}
|
}
|
||||||
pts.addrs = pts.addrs[:0]
|
pts.brs = pts.brs[:0]
|
||||||
close(workCh)
|
close(workCh)
|
||||||
|
|
||||||
// Wait until workers finish
|
// Wait until workers finish
|
||||||
|
@ -314,8 +314,8 @@ func (sb *sortBlock) reset() {
|
||||||
sb.NextIdx = 0
|
sb.NextIdx = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sb *sortBlock) unpackFrom(tbf *tmpBlocksFile, addr tmpBlockAddr, tr storage.TimeRange, fetchData bool) error {
|
func (sb *sortBlock) unpackFrom(br storage.BlockRef, tr storage.TimeRange, fetchData bool) error {
|
||||||
tbf.MustReadBlockAt(&sb.b, addr)
|
br.MustReadBlock(&sb.b, fetchData)
|
||||||
if fetchData {
|
if fetchData {
|
||||||
if err := sb.b.UnmarshalData(); err != nil {
|
if err := sb.b.UnmarshalData(); err != nil {
|
||||||
return fmt.Errorf("cannot unmarshal block: %s", err)
|
return fmt.Errorf("cannot unmarshal block: %s", err)
|
||||||
|
@ -483,6 +483,8 @@ func putStorageSearch(sr *storage.Search) {
|
||||||
var ssPool sync.Pool
|
var ssPool sync.Pool
|
||||||
|
|
||||||
// ProcessSearchQuery performs sq on storage nodes until the given deadline.
|
// ProcessSearchQuery performs sq on storage nodes until the given deadline.
|
||||||
|
//
|
||||||
|
// Results.RunParallel or Results.Cancel must be called on the returned Results.
|
||||||
func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline Deadline) (*Results, error) {
|
func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline Deadline) (*Results, error) {
|
||||||
// Setup search.
|
// Setup search.
|
||||||
tfss, err := setupTfss(sq.TagFilterss)
|
tfss, err := setupTfss(sq.TagFilterss)
|
||||||
|
@ -498,56 +500,40 @@ func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline Deadli
|
||||||
defer vmstorage.WG.Done()
|
defer vmstorage.WG.Done()
|
||||||
|
|
||||||
sr := getStorageSearch()
|
sr := getStorageSearch()
|
||||||
defer putStorageSearch(sr)
|
sr.Init(vmstorage.Storage, tfss, tr, *maxMetricsPerSearch)
|
||||||
sr.Init(vmstorage.Storage, tfss, tr, fetchData, *maxMetricsPerSearch)
|
|
||||||
|
|
||||||
tbf := getTmpBlocksFile()
|
m := make(map[string][]storage.BlockRef)
|
||||||
m := make(map[string][]tmpBlockAddr)
|
|
||||||
var orderedMetricNames []string
|
var orderedMetricNames []string
|
||||||
blocksRead := 0
|
blocksRead := 0
|
||||||
bb := tmpBufPool.Get()
|
|
||||||
defer tmpBufPool.Put(bb)
|
|
||||||
for sr.NextMetricBlock() {
|
for sr.NextMetricBlock() {
|
||||||
blocksRead++
|
blocksRead++
|
||||||
bb.B = storage.MarshalBlock(bb.B[:0], sr.MetricBlock.Block)
|
|
||||||
addr, err := tbf.WriteBlockData(bb.B)
|
|
||||||
if err != nil {
|
|
||||||
putTmpBlocksFile(tbf)
|
|
||||||
return nil, fmt.Errorf("cannot write data block #%d to temporary blocks file: %s", blocksRead, err)
|
|
||||||
}
|
|
||||||
if time.Until(deadline.Deadline) < 0 {
|
if time.Until(deadline.Deadline) < 0 {
|
||||||
putTmpBlocksFile(tbf)
|
|
||||||
return nil, fmt.Errorf("timeout exceeded while fetching data block #%d from storage: %s", blocksRead, deadline.String())
|
return nil, fmt.Errorf("timeout exceeded while fetching data block #%d from storage: %s", blocksRead, deadline.String())
|
||||||
}
|
}
|
||||||
metricName := sr.MetricBlock.MetricName
|
metricName := sr.MetricBlockRef.MetricName
|
||||||
addrs := m[string(metricName)]
|
brs := m[string(metricName)]
|
||||||
if len(addrs) == 0 {
|
if len(brs) == 0 {
|
||||||
orderedMetricNames = append(orderedMetricNames, string(metricName))
|
orderedMetricNames = append(orderedMetricNames, string(metricName))
|
||||||
}
|
}
|
||||||
m[string(metricName)] = append(addrs, addr)
|
m[string(metricName)] = append(brs, *sr.MetricBlockRef.BlockRef)
|
||||||
}
|
}
|
||||||
if err := sr.Error(); err != nil {
|
if err := sr.Error(); err != nil {
|
||||||
putTmpBlocksFile(tbf)
|
|
||||||
return nil, fmt.Errorf("search error after reading %d data blocks: %s", blocksRead, err)
|
return nil, fmt.Errorf("search error after reading %d data blocks: %s", blocksRead, err)
|
||||||
}
|
}
|
||||||
if err := tbf.Finalize(); err != nil {
|
|
||||||
putTmpBlocksFile(tbf)
|
|
||||||
return nil, fmt.Errorf("cannot finalize temporary blocks file with %d blocks: %s", blocksRead, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var rss Results
|
var rss Results
|
||||||
rss.tr = tr
|
rss.tr = tr
|
||||||
rss.fetchData = fetchData
|
rss.fetchData = fetchData
|
||||||
rss.deadline = deadline
|
rss.deadline = deadline
|
||||||
rss.tbf = tbf
|
|
||||||
pts := make([]packedTimeseries, len(orderedMetricNames))
|
pts := make([]packedTimeseries, len(orderedMetricNames))
|
||||||
for i, metricName := range orderedMetricNames {
|
for i, metricName := range orderedMetricNames {
|
||||||
pts[i] = packedTimeseries{
|
pts[i] = packedTimeseries{
|
||||||
metricName: metricName,
|
metricName: metricName,
|
||||||
addrs: m[metricName],
|
brs: m[metricName],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
rss.packedTimeseries = pts
|
rss.packedTimeseries = pts
|
||||||
|
rss.sr = sr
|
||||||
return &rss, nil
|
return &rss, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,185 +0,0 @@
|
||||||
package netstorage
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"io/ioutil"
|
|
||||||
"os"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
|
||||||
"github.com/VictoriaMetrics/metrics"
|
|
||||||
)
|
|
||||||
|
|
||||||
// InitTmpBlocksDir initializes directory to store temporary search results.
|
|
||||||
//
|
|
||||||
// It stores data in system-defined temporary directory if tmpDirPath is empty.
|
|
||||||
func InitTmpBlocksDir(tmpDirPath string) {
|
|
||||||
if len(tmpDirPath) == 0 {
|
|
||||||
tmpDirPath = os.TempDir()
|
|
||||||
}
|
|
||||||
tmpBlocksDir = tmpDirPath + "/searchResults"
|
|
||||||
fs.MustRemoveAll(tmpBlocksDir)
|
|
||||||
if err := fs.MkdirAllIfNotExist(tmpBlocksDir); err != nil {
|
|
||||||
logger.Panicf("FATAL: cannot create %q: %s", tmpBlocksDir, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var tmpBlocksDir string
|
|
||||||
|
|
||||||
func maxInmemoryTmpBlocksFile() int {
|
|
||||||
mem := memory.Allowed()
|
|
||||||
maxLen := mem / 1024
|
|
||||||
if maxLen < 64*1024 {
|
|
||||||
return 64 * 1024
|
|
||||||
}
|
|
||||||
if maxLen > 4*1024*1024 {
|
|
||||||
return 4 * 1024 * 1024
|
|
||||||
}
|
|
||||||
return maxLen
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ = metrics.NewGauge(`vm_tmp_blocks_max_inmemory_file_size_bytes`, func() float64 {
|
|
||||||
return float64(maxInmemoryTmpBlocksFile())
|
|
||||||
})
|
|
||||||
|
|
||||||
type tmpBlocksFile struct {
|
|
||||||
buf []byte
|
|
||||||
|
|
||||||
f *os.File
|
|
||||||
r *fs.ReaderAt
|
|
||||||
|
|
||||||
offset uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
func getTmpBlocksFile() *tmpBlocksFile {
|
|
||||||
v := tmpBlocksFilePool.Get()
|
|
||||||
if v == nil {
|
|
||||||
return &tmpBlocksFile{
|
|
||||||
buf: make([]byte, 0, maxInmemoryTmpBlocksFile()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return v.(*tmpBlocksFile)
|
|
||||||
}
|
|
||||||
|
|
||||||
func putTmpBlocksFile(tbf *tmpBlocksFile) {
|
|
||||||
tbf.MustClose()
|
|
||||||
tbf.buf = tbf.buf[:0]
|
|
||||||
tbf.f = nil
|
|
||||||
tbf.r = nil
|
|
||||||
tbf.offset = 0
|
|
||||||
tmpBlocksFilePool.Put(tbf)
|
|
||||||
}
|
|
||||||
|
|
||||||
var tmpBlocksFilePool sync.Pool
|
|
||||||
|
|
||||||
type tmpBlockAddr struct {
|
|
||||||
offset uint64
|
|
||||||
size int
|
|
||||||
}
|
|
||||||
|
|
||||||
func (addr tmpBlockAddr) String() string {
|
|
||||||
return fmt.Sprintf("offset %d, size %d", addr.offset, addr.size)
|
|
||||||
}
|
|
||||||
|
|
||||||
var tmpBlocksFilesCreated = metrics.NewCounter(`vm_tmp_blocks_files_created_total`)
|
|
||||||
|
|
||||||
// WriteBlockData writes b to tbf.
|
|
||||||
//
|
|
||||||
// It returns errors since the operation may fail on space shortage
|
|
||||||
// and this must be handled.
|
|
||||||
func (tbf *tmpBlocksFile) WriteBlockData(b []byte) (tmpBlockAddr, error) {
|
|
||||||
var addr tmpBlockAddr
|
|
||||||
addr.offset = tbf.offset
|
|
||||||
addr.size = len(b)
|
|
||||||
tbf.offset += uint64(addr.size)
|
|
||||||
if len(tbf.buf)+len(b) <= cap(tbf.buf) {
|
|
||||||
// Fast path - the data fits tbf.buf
|
|
||||||
tbf.buf = append(tbf.buf, b...)
|
|
||||||
return addr, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Slow path: flush the data from tbf.buf to file.
|
|
||||||
if tbf.f == nil {
|
|
||||||
f, err := ioutil.TempFile(tmpBlocksDir, "")
|
|
||||||
if err != nil {
|
|
||||||
return addr, err
|
|
||||||
}
|
|
||||||
tbf.f = f
|
|
||||||
tmpBlocksFilesCreated.Inc()
|
|
||||||
}
|
|
||||||
_, err := tbf.f.Write(tbf.buf)
|
|
||||||
tbf.buf = append(tbf.buf[:0], b...)
|
|
||||||
if err != nil {
|
|
||||||
return addr, fmt.Errorf("cannot write block to %q: %s", tbf.f.Name(), err)
|
|
||||||
}
|
|
||||||
return addr, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tbf *tmpBlocksFile) Finalize() error {
|
|
||||||
if tbf.f == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
fname := tbf.f.Name()
|
|
||||||
if _, err := tbf.f.Write(tbf.buf); err != nil {
|
|
||||||
return fmt.Errorf("cannot write the remaining %d bytes to %q: %s", len(tbf.buf), fname, err)
|
|
||||||
}
|
|
||||||
tbf.buf = tbf.buf[:0]
|
|
||||||
r, err := fs.OpenReaderAt(fname)
|
|
||||||
if err != nil {
|
|
||||||
logger.Panicf("FATAL: cannot open %q: %s", fname, err)
|
|
||||||
}
|
|
||||||
// Hint the OS that the file is read almost sequentiallly.
|
|
||||||
// This should reduce the number of disk seeks, which is important
|
|
||||||
// for HDDs.
|
|
||||||
r.MustFadviseSequentialRead(true)
|
|
||||||
tbf.r = r
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tbf *tmpBlocksFile) MustReadBlockAt(dst *storage.Block, addr tmpBlockAddr) {
|
|
||||||
var buf []byte
|
|
||||||
if tbf.f == nil {
|
|
||||||
buf = tbf.buf[addr.offset : addr.offset+uint64(addr.size)]
|
|
||||||
} else {
|
|
||||||
bb := tmpBufPool.Get()
|
|
||||||
defer tmpBufPool.Put(bb)
|
|
||||||
bb.B = bytesutil.Resize(bb.B, addr.size)
|
|
||||||
tbf.r.MustReadAt(bb.B, int64(addr.offset))
|
|
||||||
buf = bb.B
|
|
||||||
}
|
|
||||||
tail, err := storage.UnmarshalBlock(dst, buf)
|
|
||||||
if err != nil {
|
|
||||||
logger.Panicf("FATAL: cannot unmarshal data at %s: %s", addr, err)
|
|
||||||
}
|
|
||||||
if len(tail) > 0 {
|
|
||||||
logger.Panicf("FATAL: unexpected non-empty tail left after unmarshaling data at %s; len(tail)=%d", addr, len(tail))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var tmpBufPool bytesutil.ByteBufferPool
|
|
||||||
|
|
||||||
func (tbf *tmpBlocksFile) MustClose() {
|
|
||||||
if tbf.f == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if tbf.r != nil {
|
|
||||||
// tbf.r could be nil if Finalize wasn't called.
|
|
||||||
tbf.r.MustClose()
|
|
||||||
}
|
|
||||||
fname := tbf.f.Name()
|
|
||||||
|
|
||||||
// Remove the file at first, then close it.
|
|
||||||
// This way the OS shouldn't try to flush file contents to storage
|
|
||||||
// on close.
|
|
||||||
if err := os.Remove(fname); err != nil {
|
|
||||||
logger.Panicf("FATAL: cannot remove %q: %s", fname, err)
|
|
||||||
}
|
|
||||||
if err := tbf.f.Close(); err != nil {
|
|
||||||
logger.Panicf("FATAL: cannot close %q: %s", fname, err)
|
|
||||||
}
|
|
||||||
tbf.f = nil
|
|
||||||
}
|
|
|
@ -1,153 +0,0 @@
|
||||||
package netstorage
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"math/rand"
|
|
||||||
"os"
|
|
||||||
"reflect"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestMain(m *testing.M) {
|
|
||||||
rand.Seed(time.Now().UnixNano())
|
|
||||||
tmpDir := "TestTmpBlocks"
|
|
||||||
InitTmpBlocksDir(tmpDir)
|
|
||||||
statusCode := m.Run()
|
|
||||||
if err := os.RemoveAll(tmpDir); err != nil {
|
|
||||||
logger.Panicf("cannot remove %q: %s", tmpDir, err)
|
|
||||||
}
|
|
||||||
os.Exit(statusCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestTmpBlocksFileSerial(t *testing.T) {
|
|
||||||
if err := testTmpBlocksFile(); err != nil {
|
|
||||||
t.Fatalf("unexpected error: %s", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestTmpBlocksFileConcurrent(t *testing.T) {
|
|
||||||
concurrency := 3
|
|
||||||
ch := make(chan error, concurrency)
|
|
||||||
for i := 0; i < concurrency; i++ {
|
|
||||||
go func() {
|
|
||||||
ch <- testTmpBlocksFile()
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
for i := 0; i < concurrency; i++ {
|
|
||||||
select {
|
|
||||||
case err := <-ch:
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error: %s", err)
|
|
||||||
}
|
|
||||||
case <-time.After(30 * time.Second):
|
|
||||||
t.Fatalf("timeout")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func testTmpBlocksFile() error {
|
|
||||||
createBlock := func() *storage.Block {
|
|
||||||
rowsCount := rand.Intn(8000) + 1
|
|
||||||
var timestamps, values []int64
|
|
||||||
ts := int64(rand.Intn(1023434))
|
|
||||||
for i := 0; i < rowsCount; i++ {
|
|
||||||
ts += int64(rand.Intn(1000) + 1)
|
|
||||||
timestamps = append(timestamps, ts)
|
|
||||||
values = append(values, int64(i*i+rand.Intn(20)))
|
|
||||||
}
|
|
||||||
tsid := &storage.TSID{
|
|
||||||
MetricID: 234211,
|
|
||||||
}
|
|
||||||
scale := int16(rand.Intn(123))
|
|
||||||
precisionBits := uint8(rand.Intn(63) + 1)
|
|
||||||
var b storage.Block
|
|
||||||
b.Init(tsid, timestamps, values, scale, precisionBits)
|
|
||||||
_, _, _ = b.MarshalData(0, 0)
|
|
||||||
return &b
|
|
||||||
}
|
|
||||||
for _, size := range []int{1024, 16 * 1024, maxInmemoryTmpBlocksFile() / 2, 2 * maxInmemoryTmpBlocksFile()} {
|
|
||||||
err := func() error {
|
|
||||||
tbf := getTmpBlocksFile()
|
|
||||||
defer putTmpBlocksFile(tbf)
|
|
||||||
|
|
||||||
// Write blocks until their summary size exceeds `size`.
|
|
||||||
var addrs []tmpBlockAddr
|
|
||||||
var blocks []*storage.Block
|
|
||||||
bb := tmpBufPool.Get()
|
|
||||||
defer tmpBufPool.Put(bb)
|
|
||||||
for tbf.offset < uint64(size) {
|
|
||||||
b := createBlock()
|
|
||||||
bb.B = storage.MarshalBlock(bb.B[:0], b)
|
|
||||||
addr, err := tbf.WriteBlockData(bb.B)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("cannot write block at offset %d: %s", tbf.offset, err)
|
|
||||||
}
|
|
||||||
if addr.offset+uint64(addr.size) != tbf.offset {
|
|
||||||
return fmt.Errorf("unexpected addr=%+v for offset %v", &addr, tbf.offset)
|
|
||||||
}
|
|
||||||
addrs = append(addrs, addr)
|
|
||||||
blocks = append(blocks, b)
|
|
||||||
}
|
|
||||||
if err := tbf.Finalize(); err != nil {
|
|
||||||
return fmt.Errorf("cannot finalize tbf: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read blocks in parallel and verify them
|
|
||||||
concurrency := 2
|
|
||||||
workCh := make(chan int)
|
|
||||||
doneCh := make(chan error)
|
|
||||||
for i := 0; i < concurrency; i++ {
|
|
||||||
go func() {
|
|
||||||
doneCh <- func() error {
|
|
||||||
var b1 storage.Block
|
|
||||||
for idx := range workCh {
|
|
||||||
addr := addrs[idx]
|
|
||||||
b := blocks[idx]
|
|
||||||
if err := b.UnmarshalData(); err != nil {
|
|
||||||
return fmt.Errorf("cannot unmarshal data from the original block: %s", err)
|
|
||||||
}
|
|
||||||
b1.Reset()
|
|
||||||
tbf.MustReadBlockAt(&b1, addr)
|
|
||||||
if err := b1.UnmarshalData(); err != nil {
|
|
||||||
return fmt.Errorf("cannot unmarshal data from tbf: %s", err)
|
|
||||||
}
|
|
||||||
if b1.RowsCount() != b.RowsCount() {
|
|
||||||
return fmt.Errorf("unexpected number of rows in tbf block; got %d; want %d", b1.RowsCount(), b.RowsCount())
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(b1.Timestamps(), b.Timestamps()) {
|
|
||||||
return fmt.Errorf("unexpected timestamps; got\n%v\nwant\n%v", b1.Timestamps(), b.Timestamps())
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(b1.Values(), b.Values()) {
|
|
||||||
return fmt.Errorf("unexpected values; got\n%v\nwant\n%v", b1.Values(), b.Values())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}()
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
for i := range addrs {
|
|
||||||
workCh <- i
|
|
||||||
}
|
|
||||||
close(workCh)
|
|
||||||
for i := 0; i < concurrency; i++ {
|
|
||||||
select {
|
|
||||||
case err := <-doneCh:
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
case <-time.After(time.Second):
|
|
||||||
return fmt.Errorf("timeout")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1,10 +0,0 @@
|
||||||
package fs
|
|
||||||
|
|
||||||
import (
|
|
||||||
"os"
|
|
||||||
)
|
|
||||||
|
|
||||||
func fadviseSequentialRead(f *os.File, prefetch bool) error {
|
|
||||||
// TODO: implement this properly
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1,22 +0,0 @@
|
||||||
// +build linux freebsd
|
|
||||||
|
|
||||||
package fs
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
"golang.org/x/sys/unix"
|
|
||||||
)
|
|
||||||
|
|
||||||
func fadviseSequentialRead(f *os.File, prefetch bool) error {
|
|
||||||
fd := int(f.Fd())
|
|
||||||
mode := unix.FADV_SEQUENTIAL
|
|
||||||
if prefetch {
|
|
||||||
mode |= unix.FADV_WILLNEED
|
|
||||||
}
|
|
||||||
if err := unix.Fadvise(int(fd), 0, 0, mode); err != nil {
|
|
||||||
return fmt.Errorf("error returned from unix.Fadvise(%d): %s", mode, err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -67,15 +67,6 @@ func (r *ReaderAt) MustClose() {
|
||||||
readersCount.Dec()
|
readersCount.Dec()
|
||||||
}
|
}
|
||||||
|
|
||||||
// MustFadviseSequentialRead hints the OS that f is read mostly sequentially.
|
|
||||||
//
|
|
||||||
// if prefetch is set, then the OS is hinted to prefetch f data.
|
|
||||||
func (r *ReaderAt) MustFadviseSequentialRead(prefetch bool) {
|
|
||||||
if err := fadviseSequentialRead(r.f, prefetch); err != nil {
|
|
||||||
logger.Panicf("FATAL: error in fadviseSequentialRead(%q, %v): %s", r.f.Name(), prefetch, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// OpenReaderAt opens ReaderAt for reading from filename.
|
// OpenReaderAt opens ReaderAt for reading from filename.
|
||||||
//
|
//
|
||||||
// MustClose must be called on the returned ReaderAt when it is no longer needed.
|
// MustClose must be called on the returned ReaderAt when it is no longer needed.
|
||||||
|
@ -94,7 +85,6 @@ func OpenReaderAt(path string) (*ReaderAt, error) {
|
||||||
}
|
}
|
||||||
r.mmapData = data
|
r.mmapData = data
|
||||||
}
|
}
|
||||||
r.MustFadviseSequentialRead(false)
|
|
||||||
readersCount.Inc()
|
readersCount.Inc()
|
||||||
return &r, nil
|
return &r, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,8 +15,8 @@ import (
|
||||||
// partSearch represents blocks stream for the given search args
|
// partSearch represents blocks stream for the given search args
|
||||||
// passed to Init.
|
// passed to Init.
|
||||||
type partSearch struct {
|
type partSearch struct {
|
||||||
// Block contains the found block after NextBlock call.
|
// BlockRef contains the reference to the found block after NextBlock call.
|
||||||
Block Block
|
BlockRef BlockRef
|
||||||
|
|
||||||
// p is the part to search.
|
// p is the part to search.
|
||||||
p *part
|
p *part
|
||||||
|
@ -30,9 +30,6 @@ type partSearch struct {
|
||||||
// tr is a time range to search.
|
// tr is a time range to search.
|
||||||
tr TimeRange
|
tr TimeRange
|
||||||
|
|
||||||
// Skip populating timestampsData and valuesData in Block if fetchData=false.
|
|
||||||
fetchData bool
|
|
||||||
|
|
||||||
metaindex []metaindexRow
|
metaindex []metaindexRow
|
||||||
|
|
||||||
ibCache *indexBlockCache
|
ibCache *indexBlockCache
|
||||||
|
@ -49,11 +46,10 @@ type partSearch struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *partSearch) reset() {
|
func (ps *partSearch) reset() {
|
||||||
ps.Block.Reset()
|
ps.BlockRef.reset()
|
||||||
ps.p = nil
|
ps.p = nil
|
||||||
ps.tsids = nil
|
ps.tsids = nil
|
||||||
ps.tsidIdx = 0
|
ps.tsidIdx = 0
|
||||||
ps.fetchData = true
|
|
||||||
ps.metaindex = nil
|
ps.metaindex = nil
|
||||||
ps.ibCache = nil
|
ps.ibCache = nil
|
||||||
ps.bhs = nil
|
ps.bhs = nil
|
||||||
|
@ -74,7 +70,7 @@ var isInTest = func() bool {
|
||||||
//
|
//
|
||||||
// tsids must be sorted.
|
// tsids must be sorted.
|
||||||
// tsids cannot be modified after the Init call, since it is owned by ps.
|
// tsids cannot be modified after the Init call, since it is owned by ps.
|
||||||
func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange, fetchData bool) {
|
func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange) {
|
||||||
ps.reset()
|
ps.reset()
|
||||||
ps.p = p
|
ps.p = p
|
||||||
|
|
||||||
|
@ -86,7 +82,6 @@ func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange, fetchData bool)
|
||||||
ps.tsids = tsids
|
ps.tsids = tsids
|
||||||
}
|
}
|
||||||
ps.tr = tr
|
ps.tr = tr
|
||||||
ps.fetchData = fetchData
|
|
||||||
ps.metaindex = p.metaindex
|
ps.metaindex = p.metaindex
|
||||||
ps.ibCache = p.ibCache
|
ps.ibCache = p.ibCache
|
||||||
|
|
||||||
|
@ -95,7 +90,7 @@ func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange, fetchData bool)
|
||||||
ps.nextTSID()
|
ps.nextTSID()
|
||||||
}
|
}
|
||||||
|
|
||||||
// NextBlock advances to the next Block.
|
// NextBlock advances to the next BlockRef.
|
||||||
//
|
//
|
||||||
// Returns true on success.
|
// Returns true on success.
|
||||||
//
|
//
|
||||||
|
@ -130,7 +125,7 @@ func (ps *partSearch) nextTSID() bool {
|
||||||
ps.err = io.EOF
|
ps.err = io.EOF
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
ps.Block.bh.TSID = ps.tsids[ps.tsidIdx]
|
ps.BlockRef.bh.TSID = ps.tsids[ps.tsidIdx]
|
||||||
ps.tsidIdx++
|
ps.tsidIdx++
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
@ -139,20 +134,20 @@ func (ps *partSearch) nextBHS() bool {
|
||||||
for len(ps.metaindex) > 0 {
|
for len(ps.metaindex) > 0 {
|
||||||
// Optimization: skip tsid values smaller than the minimum value
|
// Optimization: skip tsid values smaller than the minimum value
|
||||||
// from ps.metaindex.
|
// from ps.metaindex.
|
||||||
for ps.Block.bh.TSID.Less(&ps.metaindex[0].TSID) {
|
for ps.BlockRef.bh.TSID.Less(&ps.metaindex[0].TSID) {
|
||||||
if !ps.nextTSID() {
|
if !ps.nextTSID() {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Invariant: ps.Block.bh.TSID >= ps.metaindex[0].TSID
|
// Invariant: ps.BlockRef.bh.TSID >= ps.metaindex[0].TSID
|
||||||
|
|
||||||
ps.metaindex = skipSmallMetaindexRows(ps.metaindex, &ps.Block.bh.TSID)
|
ps.metaindex = skipSmallMetaindexRows(ps.metaindex, &ps.BlockRef.bh.TSID)
|
||||||
// Invariant: len(ps.metaindex) > 0 && ps.Block.bh.TSID >= ps.metaindex[0].TSID
|
// Invariant: len(ps.metaindex) > 0 && ps.BlockRef.bh.TSID >= ps.metaindex[0].TSID
|
||||||
|
|
||||||
mr := &ps.metaindex[0]
|
mr := &ps.metaindex[0]
|
||||||
ps.metaindex = ps.metaindex[1:]
|
ps.metaindex = ps.metaindex[1:]
|
||||||
if ps.Block.bh.TSID.Less(&mr.TSID) {
|
if ps.BlockRef.bh.TSID.Less(&mr.TSID) {
|
||||||
logger.Panicf("BUG: invariant violation: ps.Block.bh.TSID cannot be smaller than mr.TSID; got %+v vs %+v", &ps.Block.bh.TSID, &mr.TSID)
|
logger.Panicf("BUG: invariant violation: ps.BlockRef.bh.TSID cannot be smaller than mr.TSID; got %+v vs %+v", &ps.BlockRef.bh.TSID, &mr.TSID)
|
||||||
}
|
}
|
||||||
|
|
||||||
if mr.MaxTimestamp < ps.tr.MinTimestamp {
|
if mr.MaxTimestamp < ps.tr.MinTimestamp {
|
||||||
|
@ -165,7 +160,7 @@ func (ps *partSearch) nextBHS() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Found the index block which may contain the required data
|
// Found the index block which may contain the required data
|
||||||
// for the ps.Block.bh.TSID and the given timestamp range.
|
// for the ps.BlockRef.bh.TSID and the given timestamp range.
|
||||||
if ps.indexBlockReuse != nil {
|
if ps.indexBlockReuse != nil {
|
||||||
putIndexBlock(ps.indexBlockReuse)
|
putIndexBlock(ps.indexBlockReuse)
|
||||||
ps.indexBlockReuse = nil
|
ps.indexBlockReuse = nil
|
||||||
|
@ -249,15 +244,15 @@ func (ps *partSearch) searchBHS() bool {
|
||||||
bh := &ps.bhs[i]
|
bh := &ps.bhs[i]
|
||||||
|
|
||||||
nextTSID:
|
nextTSID:
|
||||||
if bh.TSID.Less(&ps.Block.bh.TSID) {
|
if bh.TSID.Less(&ps.BlockRef.bh.TSID) {
|
||||||
// Skip blocks with small tsid values.
|
// Skip blocks with small tsid values.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Invariant: ps.Block.bh.TSID <= bh.TSID
|
// Invariant: ps.BlockRef.bh.TSID <= bh.TSID
|
||||||
|
|
||||||
if bh.TSID.MetricID != ps.Block.bh.TSID.MetricID {
|
if bh.TSID.MetricID != ps.BlockRef.bh.TSID.MetricID {
|
||||||
// ps.Block.bh.TSID < bh.TSID: no more blocks with the given tsid.
|
// ps.BlockRef.bh.TSID < bh.TSID: no more blocks with the given tsid.
|
||||||
// Proceed to the next (bigger) tsid.
|
// Proceed to the next (bigger) tsid.
|
||||||
if !ps.nextTSID() {
|
if !ps.nextTSID() {
|
||||||
return false
|
return false
|
||||||
|
@ -284,7 +279,7 @@ func (ps *partSearch) searchBHS() bool {
|
||||||
|
|
||||||
// Found the tsid block with the matching timestamp range.
|
// Found the tsid block with the matching timestamp range.
|
||||||
// Read it.
|
// Read it.
|
||||||
ps.readBlock(bh)
|
ps.BlockRef.init(ps.p, bh)
|
||||||
|
|
||||||
ps.bhs = ps.bhs[i+1:]
|
ps.bhs = ps.bhs[i+1:]
|
||||||
return true
|
return true
|
||||||
|
@ -293,17 +288,3 @@ func (ps *partSearch) searchBHS() bool {
|
||||||
ps.bhs = nil
|
ps.bhs = nil
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *partSearch) readBlock(bh *blockHeader) {
|
|
||||||
ps.Block.Reset()
|
|
||||||
ps.Block.bh = *bh
|
|
||||||
if !ps.fetchData {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ps.Block.timestampsData = bytesutil.Resize(ps.Block.timestampsData[:0], int(bh.TimestampsBlockSize))
|
|
||||||
ps.p.timestampsFile.MustReadAt(ps.Block.timestampsData, int64(bh.TimestampsBlockOffset))
|
|
||||||
|
|
||||||
ps.Block.valuesData = bytesutil.Resize(ps.Block.valuesData[:0], int(bh.ValuesBlockSize))
|
|
||||||
ps.p.valuesFile.MustReadAt(ps.Block.valuesData, int64(bh.ValuesBlockOffset))
|
|
||||||
}
|
|
||||||
|
|
|
@ -1247,11 +1247,11 @@ func testPartSearch(t *testing.T, p *part, tsids []TSID, tr TimeRange, expectedR
|
||||||
|
|
||||||
func testPartSearchSerial(p *part, tsids []TSID, tr TimeRange, expectedRawBlocks []rawBlock) error {
|
func testPartSearchSerial(p *part, tsids []TSID, tr TimeRange, expectedRawBlocks []rawBlock) error {
|
||||||
var ps partSearch
|
var ps partSearch
|
||||||
ps.Init(p, tsids, tr, true)
|
ps.Init(p, tsids, tr)
|
||||||
var bs []Block
|
var bs []Block
|
||||||
for ps.NextBlock() {
|
for ps.NextBlock() {
|
||||||
var b Block
|
var b Block
|
||||||
b.CopyFrom(&ps.Block)
|
ps.BlockRef.MustReadBlock(&b, true)
|
||||||
bs = append(bs, b)
|
bs = append(bs, b)
|
||||||
}
|
}
|
||||||
if err := ps.Error(); err != nil {
|
if err := ps.Error(); err != nil {
|
||||||
|
|
|
@ -10,8 +10,8 @@ import (
|
||||||
|
|
||||||
// partitionSearch represents a search in the partition.
|
// partitionSearch represents a search in the partition.
|
||||||
type partitionSearch struct {
|
type partitionSearch struct {
|
||||||
// Block is the block found after NextBlock call.
|
// BlockRef is the block found after NextBlock call.
|
||||||
Block *Block
|
BlockRef *BlockRef
|
||||||
|
|
||||||
// pt is a partition to search.
|
// pt is a partition to search.
|
||||||
pt *partition
|
pt *partition
|
||||||
|
@ -30,7 +30,7 @@ type partitionSearch struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pts *partitionSearch) reset() {
|
func (pts *partitionSearch) reset() {
|
||||||
pts.Block = nil
|
pts.BlockRef = nil
|
||||||
pts.pt = nil
|
pts.pt = nil
|
||||||
|
|
||||||
for i := range pts.pws {
|
for i := range pts.pws {
|
||||||
|
@ -59,7 +59,7 @@ func (pts *partitionSearch) reset() {
|
||||||
// tsids cannot be modified after the Init call, since it is owned by pts.
|
// tsids cannot be modified after the Init call, since it is owned by pts.
|
||||||
//
|
//
|
||||||
/// MustClose must be called when partition search is done.
|
/// MustClose must be called when partition search is done.
|
||||||
func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange, fetchData bool) {
|
func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange) {
|
||||||
if pts.needClosing {
|
if pts.needClosing {
|
||||||
logger.Panicf("BUG: missing partitionSearch.MustClose call before the next call to Init")
|
logger.Panicf("BUG: missing partitionSearch.MustClose call before the next call to Init")
|
||||||
}
|
}
|
||||||
|
@ -88,7 +88,7 @@ func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange, fetc
|
||||||
}
|
}
|
||||||
pts.psPool = pts.psPool[:len(pts.pws)]
|
pts.psPool = pts.psPool[:len(pts.pws)]
|
||||||
for i, pw := range pts.pws {
|
for i, pw := range pts.pws {
|
||||||
pts.psPool[i].Init(pw.p, tsids, tr, fetchData)
|
pts.psPool[i].Init(pw.p, tsids, tr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize the psHeap.
|
// Initialize the psHeap.
|
||||||
|
@ -114,7 +114,7 @@ func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange, fetc
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
heap.Init(&pts.psHeap)
|
heap.Init(&pts.psHeap)
|
||||||
pts.Block = &pts.psHeap[0].Block
|
pts.BlockRef = &pts.psHeap[0].BlockRef
|
||||||
pts.nextBlockNoop = true
|
pts.nextBlockNoop = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -145,7 +145,7 @@ func (pts *partitionSearch) nextBlock() error {
|
||||||
psMin := pts.psHeap[0]
|
psMin := pts.psHeap[0]
|
||||||
if psMin.NextBlock() {
|
if psMin.NextBlock() {
|
||||||
heap.Fix(&pts.psHeap, 0)
|
heap.Fix(&pts.psHeap, 0)
|
||||||
pts.Block = &pts.psHeap[0].Block
|
pts.BlockRef = &pts.psHeap[0].BlockRef
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -159,7 +159,7 @@ func (pts *partitionSearch) nextBlock() error {
|
||||||
return io.EOF
|
return io.EOF
|
||||||
}
|
}
|
||||||
|
|
||||||
pts.Block = &pts.psHeap[0].Block
|
pts.BlockRef = &pts.psHeap[0].BlockRef
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -188,7 +188,7 @@ func (psh *partSearchHeap) Len() int {
|
||||||
|
|
||||||
func (psh *partSearchHeap) Less(i, j int) bool {
|
func (psh *partSearchHeap) Less(i, j int) bool {
|
||||||
x := *psh
|
x := *psh
|
||||||
return x[i].Block.bh.Less(&x[j].Block.bh)
|
return x[i].BlockRef.bh.Less(&x[j].BlockRef.bh)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (psh *partSearchHeap) Swap(i, j int) {
|
func (psh *partSearchHeap) Swap(i, j int) {
|
||||||
|
|
|
@ -240,10 +240,10 @@ func testPartitionSearchSerial(pt *partition, tsids []TSID, tr TimeRange, rbsExp
|
||||||
|
|
||||||
bs := []Block{}
|
bs := []Block{}
|
||||||
var pts partitionSearch
|
var pts partitionSearch
|
||||||
pts.Init(pt, tsids, tr, true)
|
pts.Init(pt, tsids, tr)
|
||||||
for pts.NextBlock() {
|
for pts.NextBlock() {
|
||||||
var b Block
|
var b Block
|
||||||
b.CopyFrom(pts.Block)
|
pts.BlockRef.MustReadBlock(&b, true)
|
||||||
bs = append(bs, b)
|
bs = append(bs, b)
|
||||||
}
|
}
|
||||||
if err := pts.Error(); err != nil {
|
if err := pts.Error(); err != nil {
|
||||||
|
@ -265,18 +265,9 @@ func testPartitionSearchSerial(pt *partition, tsids []TSID, tr TimeRange, rbsExp
|
||||||
}
|
}
|
||||||
|
|
||||||
// verify that empty tsids returns empty result
|
// verify that empty tsids returns empty result
|
||||||
pts.Init(pt, []TSID{}, tr, true)
|
pts.Init(pt, []TSID{}, tr)
|
||||||
if pts.NextBlock() {
|
if pts.NextBlock() {
|
||||||
return fmt.Errorf("unexpected block got for an empty tsids list: %+v", pts.Block)
|
return fmt.Errorf("unexpected block got for an empty tsids list: %+v", pts.BlockRef)
|
||||||
}
|
|
||||||
if err := pts.Error(); err != nil {
|
|
||||||
return fmt.Errorf("unexpected error on empty tsids list: %s", err)
|
|
||||||
}
|
|
||||||
pts.MustClose()
|
|
||||||
|
|
||||||
pts.Init(pt, []TSID{}, tr, false)
|
|
||||||
if pts.NextBlock() {
|
|
||||||
return fmt.Errorf("unexpected block got for an empty tsids list: %+v", pts.Block)
|
|
||||||
}
|
}
|
||||||
if err := pts.Error(); err != nil {
|
if err := pts.Error(); err != nil {
|
||||||
return fmt.Errorf("unexpected error on empty tsids list: %s", err)
|
return fmt.Errorf("unexpected error on empty tsids list: %s", err)
|
||||||
|
|
|
@ -9,77 +9,55 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
// MetricBlock is a time series block for a single metric.
|
// BlockRef references a Block.
|
||||||
type MetricBlock struct {
|
//
|
||||||
|
// BlockRef is valid only until the corresponding Search is valid,
|
||||||
|
// i.e. it becomes invalid after Search.MustClose is called.
|
||||||
|
type BlockRef struct {
|
||||||
|
p *part
|
||||||
|
bh blockHeader
|
||||||
|
}
|
||||||
|
|
||||||
|
func (br *BlockRef) reset() {
|
||||||
|
br.p = nil
|
||||||
|
br.bh = blockHeader{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (br *BlockRef) init(p *part, bh *blockHeader) {
|
||||||
|
br.p = p
|
||||||
|
br.bh = *bh
|
||||||
|
}
|
||||||
|
|
||||||
|
// MustReadBlock reads block from br to dst.
|
||||||
|
//
|
||||||
|
// if fetchData is false, then only block header is read, otherwise all the data is read.
|
||||||
|
func (br *BlockRef) MustReadBlock(dst *Block, fetchData bool) {
|
||||||
|
dst.Reset()
|
||||||
|
dst.bh = br.bh
|
||||||
|
if !fetchData {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
dst.timestampsData = bytesutil.Resize(dst.timestampsData[:0], int(br.bh.TimestampsBlockSize))
|
||||||
|
br.p.timestampsFile.MustReadAt(dst.timestampsData, int64(br.bh.TimestampsBlockOffset))
|
||||||
|
|
||||||
|
dst.valuesData = bytesutil.Resize(dst.valuesData[:0], int(br.bh.ValuesBlockSize))
|
||||||
|
br.p.valuesFile.MustReadAt(dst.valuesData, int64(br.bh.ValuesBlockOffset))
|
||||||
|
}
|
||||||
|
|
||||||
|
// MetricBlockRef contains reference to time series block for a single metric.
|
||||||
|
type MetricBlockRef struct {
|
||||||
|
// The metric name
|
||||||
MetricName []byte
|
MetricName []byte
|
||||||
|
|
||||||
Block *Block
|
// The block reference. Call BlockRef.MustReadBlock in order to obtain the block.
|
||||||
}
|
BlockRef *BlockRef
|
||||||
|
|
||||||
// Marshal marshals MetricBlock to dst
|
|
||||||
func (mb *MetricBlock) Marshal(dst []byte) []byte {
|
|
||||||
dst = encoding.MarshalBytes(dst, mb.MetricName)
|
|
||||||
return MarshalBlock(dst, mb.Block)
|
|
||||||
}
|
|
||||||
|
|
||||||
// MarshalBlock marshals b to dst.
|
|
||||||
//
|
|
||||||
// b.MarshalData must be called on b before calling MarshalBlock.
|
|
||||||
func MarshalBlock(dst []byte, b *Block) []byte {
|
|
||||||
dst = b.bh.Marshal(dst)
|
|
||||||
dst = encoding.MarshalBytes(dst, b.timestampsData)
|
|
||||||
dst = encoding.MarshalBytes(dst, b.valuesData)
|
|
||||||
return dst
|
|
||||||
}
|
|
||||||
|
|
||||||
// Unmarshal unmarshals MetricBlock from src
|
|
||||||
func (mb *MetricBlock) Unmarshal(src []byte) ([]byte, error) {
|
|
||||||
if mb.Block == nil {
|
|
||||||
logger.Panicf("BUG: MetricBlock.Block must be non-nil when calling Unmarshal!")
|
|
||||||
} else {
|
|
||||||
mb.Block.Reset()
|
|
||||||
}
|
|
||||||
tail, mn, err := encoding.UnmarshalBytes(src)
|
|
||||||
if err != nil {
|
|
||||||
return tail, fmt.Errorf("cannot unmarshal MetricName: %s", err)
|
|
||||||
}
|
|
||||||
mb.MetricName = append(mb.MetricName[:0], mn...)
|
|
||||||
src = tail
|
|
||||||
|
|
||||||
return UnmarshalBlock(mb.Block, src)
|
|
||||||
}
|
|
||||||
|
|
||||||
// UnmarshalBlock unmarshal Block from src to dst.
|
|
||||||
//
|
|
||||||
// dst.UnmarshalData isn't called on the block.
|
|
||||||
func UnmarshalBlock(dst *Block, src []byte) ([]byte, error) {
|
|
||||||
tail, err := dst.bh.Unmarshal(src)
|
|
||||||
if err != nil {
|
|
||||||
return tail, fmt.Errorf("cannot unmarshal blockHeader: %s", err)
|
|
||||||
}
|
|
||||||
src = tail
|
|
||||||
|
|
||||||
tail, tds, err := encoding.UnmarshalBytes(src)
|
|
||||||
if err != nil {
|
|
||||||
return tail, fmt.Errorf("cannot unmarshal timestampsData: %s", err)
|
|
||||||
}
|
|
||||||
dst.timestampsData = append(dst.timestampsData[:0], tds...)
|
|
||||||
src = tail
|
|
||||||
|
|
||||||
tail, vd, err := encoding.UnmarshalBytes(src)
|
|
||||||
if err != nil {
|
|
||||||
return tail, fmt.Errorf("cannot unmarshal valuesData: %s", err)
|
|
||||||
}
|
|
||||||
dst.valuesData = append(dst.valuesData[:0], vd...)
|
|
||||||
src = tail
|
|
||||||
|
|
||||||
return src, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Search is a search for time series.
|
// Search is a search for time series.
|
||||||
type Search struct {
|
type Search struct {
|
||||||
// MetricBlock is updated with each Search.NextMetricBlock call.
|
// MetricBlockRef is updated with each Search.NextMetricBlock call.
|
||||||
MetricBlock MetricBlock
|
MetricBlockRef MetricBlockRef
|
||||||
|
|
||||||
storage *Storage
|
storage *Storage
|
||||||
|
|
||||||
|
@ -91,8 +69,8 @@ type Search struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Search) reset() {
|
func (s *Search) reset() {
|
||||||
s.MetricBlock.MetricName = s.MetricBlock.MetricName[:0]
|
s.MetricBlockRef.MetricName = s.MetricBlockRef.MetricName[:0]
|
||||||
s.MetricBlock.Block = nil
|
s.MetricBlockRef.BlockRef = nil
|
||||||
|
|
||||||
s.storage = nil
|
s.storage = nil
|
||||||
s.ts.reset()
|
s.ts.reset()
|
||||||
|
@ -103,7 +81,7 @@ func (s *Search) reset() {
|
||||||
// Init initializes s from the given storage, tfss and tr.
|
// Init initializes s from the given storage, tfss and tr.
|
||||||
//
|
//
|
||||||
// MustClose must be called when the search is done.
|
// MustClose must be called when the search is done.
|
||||||
func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, fetchData bool, maxMetrics int) {
|
func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, maxMetrics int) {
|
||||||
if s.needClosing {
|
if s.needClosing {
|
||||||
logger.Panicf("BUG: missing MustClose call before the next call to Init")
|
logger.Panicf("BUG: missing MustClose call before the next call to Init")
|
||||||
}
|
}
|
||||||
|
@ -118,7 +96,7 @@ func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, fetchD
|
||||||
// It is ok to call Init on error from storage.searchTSIDs.
|
// It is ok to call Init on error from storage.searchTSIDs.
|
||||||
// Init must be called before returning because it will fail
|
// Init must be called before returning because it will fail
|
||||||
// on Seach.MustClose otherwise.
|
// on Seach.MustClose otherwise.
|
||||||
s.ts.Init(storage.tb, tsids, tr, fetchData)
|
s.ts.Init(storage.tb, tsids, tr)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.err = err
|
s.err = err
|
||||||
|
@ -145,15 +123,15 @@ func (s *Search) Error() error {
|
||||||
return s.err
|
return s.err
|
||||||
}
|
}
|
||||||
|
|
||||||
// NextMetricBlock proceeds to the next MetricBlock.
|
// NextMetricBlock proceeds to the next MetricBlockRef.
|
||||||
func (s *Search) NextMetricBlock() bool {
|
func (s *Search) NextMetricBlock() bool {
|
||||||
if s.err != nil {
|
if s.err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
for s.ts.NextBlock() {
|
for s.ts.NextBlock() {
|
||||||
tsid := &s.ts.Block.bh.TSID
|
tsid := &s.ts.BlockRef.bh.TSID
|
||||||
var err error
|
var err error
|
||||||
s.MetricBlock.MetricName, err = s.storage.searchMetricName(s.MetricBlock.MetricName[:0], tsid.MetricID)
|
s.MetricBlockRef.MetricName, err = s.storage.searchMetricName(s.MetricBlockRef.MetricName[:0], tsid.MetricID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
// Skip missing metricName for tsid.MetricID.
|
// Skip missing metricName for tsid.MetricID.
|
||||||
|
@ -163,7 +141,7 @@ func (s *Search) NextMetricBlock() bool {
|
||||||
s.err = err
|
s.err = err
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
s.MetricBlock.Block = s.ts.Block
|
s.MetricBlockRef.BlockRef = s.ts.BlockRef
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if err := s.ts.Error(); err != nil {
|
if err := s.ts.Error(); err != nil {
|
||||||
|
|
|
@ -208,15 +208,20 @@ func testSearchInternal(st *Storage, tr TimeRange, mrs []MetricRow, accountsCoun
|
||||||
expectedMrs = append(expectedMrs, *mr)
|
expectedMrs = append(expectedMrs, *mr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type metricBlock struct {
|
||||||
|
MetricName []byte
|
||||||
|
Block *Block
|
||||||
|
}
|
||||||
|
|
||||||
// Search
|
// Search
|
||||||
s.Init(st, []*TagFilters{tfs}, tr, true, 1e5)
|
s.Init(st, []*TagFilters{tfs}, tr, 1e5)
|
||||||
var mbs []MetricBlock
|
var mbs []metricBlock
|
||||||
for s.NextMetricBlock() {
|
for s.NextMetricBlock() {
|
||||||
var b Block
|
var b Block
|
||||||
b.CopyFrom(s.MetricBlock.Block)
|
s.MetricBlockRef.BlockRef.MustReadBlock(&b, true)
|
||||||
|
|
||||||
var mb MetricBlock
|
var mb metricBlock
|
||||||
mb.MetricName = append(mb.MetricName, s.MetricBlock.MetricName...)
|
mb.MetricName = append(mb.MetricName, s.MetricBlockRef.MetricName...)
|
||||||
mb.Block = &b
|
mb.Block = &b
|
||||||
mbs = append(mbs, mb)
|
mbs = append(mbs, mb)
|
||||||
}
|
}
|
||||||
|
|
|
@ -583,24 +583,13 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error {
|
||||||
MaxTimestamp: 2e10,
|
MaxTimestamp: 2e10,
|
||||||
}
|
}
|
||||||
metricBlocksCount := func(tfs *TagFilters) int {
|
metricBlocksCount := func(tfs *TagFilters) int {
|
||||||
// Verify the number of blocks with fetchData=true
|
// Verify the number of blocks
|
||||||
n := 0
|
n := 0
|
||||||
sr.Init(s, []*TagFilters{tfs}, tr, true, 1e5)
|
sr.Init(s, []*TagFilters{tfs}, tr, 1e5)
|
||||||
for sr.NextMetricBlock() {
|
for sr.NextMetricBlock() {
|
||||||
n++
|
n++
|
||||||
}
|
}
|
||||||
sr.MustClose()
|
sr.MustClose()
|
||||||
|
|
||||||
// Make sure the number of blocks with fetchData=false is the same.
|
|
||||||
m := 0
|
|
||||||
sr.Init(s, []*TagFilters{tfs}, tr, false, 1e5)
|
|
||||||
for sr.NextMetricBlock() {
|
|
||||||
m++
|
|
||||||
}
|
|
||||||
sr.MustClose()
|
|
||||||
if n != m {
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
return n
|
return n
|
||||||
}
|
}
|
||||||
for i := 0; i < metricsCount; i++ {
|
for i := 0; i < metricsCount; i++ {
|
||||||
|
|
|
@ -11,7 +11,7 @@ import (
|
||||||
|
|
||||||
// tableSearch performs searches in the table.
|
// tableSearch performs searches in the table.
|
||||||
type tableSearch struct {
|
type tableSearch struct {
|
||||||
Block *Block
|
BlockRef *BlockRef
|
||||||
|
|
||||||
tb *table
|
tb *table
|
||||||
|
|
||||||
|
@ -29,7 +29,7 @@ type tableSearch struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *tableSearch) reset() {
|
func (ts *tableSearch) reset() {
|
||||||
ts.Block = nil
|
ts.BlockRef = nil
|
||||||
ts.tb = nil
|
ts.tb = nil
|
||||||
|
|
||||||
for i := range ts.ptws {
|
for i := range ts.ptws {
|
||||||
|
@ -58,7 +58,7 @@ func (ts *tableSearch) reset() {
|
||||||
// tsids cannot be modified after the Init call, since it is owned by ts.
|
// tsids cannot be modified after the Init call, since it is owned by ts.
|
||||||
//
|
//
|
||||||
// MustClose must be called then the tableSearch is done.
|
// MustClose must be called then the tableSearch is done.
|
||||||
func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange, fetchData bool) {
|
func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange) {
|
||||||
if ts.needClosing {
|
if ts.needClosing {
|
||||||
logger.Panicf("BUG: missing MustClose call before the next call to Init")
|
logger.Panicf("BUG: missing MustClose call before the next call to Init")
|
||||||
}
|
}
|
||||||
|
@ -89,7 +89,7 @@ func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange, fetchData boo
|
||||||
}
|
}
|
||||||
ts.ptsPool = ts.ptsPool[:len(ts.ptws)]
|
ts.ptsPool = ts.ptsPool[:len(ts.ptws)]
|
||||||
for i, ptw := range ts.ptws {
|
for i, ptw := range ts.ptws {
|
||||||
ts.ptsPool[i].Init(ptw.pt, tsids, tr, fetchData)
|
ts.ptsPool[i].Init(ptw.pt, tsids, tr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize the ptsHeap.
|
// Initialize the ptsHeap.
|
||||||
|
@ -115,13 +115,13 @@ func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange, fetchData boo
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
heap.Init(&ts.ptsHeap)
|
heap.Init(&ts.ptsHeap)
|
||||||
ts.Block = ts.ptsHeap[0].Block
|
ts.BlockRef = ts.ptsHeap[0].BlockRef
|
||||||
ts.nextBlockNoop = true
|
ts.nextBlockNoop = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// NextBlock advances to the next block.
|
// NextBlock advances to the next block.
|
||||||
//
|
//
|
||||||
// The blocks are sorted by (TDIS, MinTimestamp). Two subsequent blocks
|
// The blocks are sorted by (TSID, MinTimestamp). Two subsequent blocks
|
||||||
// for the same TSID may contain overlapped time ranges.
|
// for the same TSID may contain overlapped time ranges.
|
||||||
func (ts *tableSearch) NextBlock() bool {
|
func (ts *tableSearch) NextBlock() bool {
|
||||||
if ts.err != nil {
|
if ts.err != nil {
|
||||||
|
@ -146,7 +146,7 @@ func (ts *tableSearch) nextBlock() error {
|
||||||
ptsMin := ts.ptsHeap[0]
|
ptsMin := ts.ptsHeap[0]
|
||||||
if ptsMin.NextBlock() {
|
if ptsMin.NextBlock() {
|
||||||
heap.Fix(&ts.ptsHeap, 0)
|
heap.Fix(&ts.ptsHeap, 0)
|
||||||
ts.Block = ts.ptsHeap[0].Block
|
ts.BlockRef = ts.ptsHeap[0].BlockRef
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,7 +160,7 @@ func (ts *tableSearch) nextBlock() error {
|
||||||
return io.EOF
|
return io.EOF
|
||||||
}
|
}
|
||||||
|
|
||||||
ts.Block = ts.ptsHeap[0].Block
|
ts.BlockRef = ts.ptsHeap[0].BlockRef
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -192,7 +192,7 @@ func (ptsh *partitionSearchHeap) Len() int {
|
||||||
|
|
||||||
func (ptsh *partitionSearchHeap) Less(i, j int) bool {
|
func (ptsh *partitionSearchHeap) Less(i, j int) bool {
|
||||||
x := *ptsh
|
x := *ptsh
|
||||||
return x[i].Block.bh.Less(&x[j].Block.bh)
|
return x[i].BlockRef.bh.Less(&x[j].BlockRef.bh)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ptsh *partitionSearchHeap) Swap(i, j int) {
|
func (ptsh *partitionSearchHeap) Swap(i, j int) {
|
||||||
|
|
|
@ -251,10 +251,10 @@ func testTableSearchSerial(tb *table, tsids []TSID, tr TimeRange, rbsExpected []
|
||||||
|
|
||||||
bs := []Block{}
|
bs := []Block{}
|
||||||
var ts tableSearch
|
var ts tableSearch
|
||||||
ts.Init(tb, tsids, tr, true)
|
ts.Init(tb, tsids, tr)
|
||||||
for ts.NextBlock() {
|
for ts.NextBlock() {
|
||||||
var b Block
|
var b Block
|
||||||
b.CopyFrom(ts.Block)
|
ts.BlockRef.MustReadBlock(&b, true)
|
||||||
bs = append(bs, b)
|
bs = append(bs, b)
|
||||||
}
|
}
|
||||||
if err := ts.Error(); err != nil {
|
if err := ts.Error(); err != nil {
|
||||||
|
@ -276,23 +276,14 @@ func testTableSearchSerial(tb *table, tsids []TSID, tr TimeRange, rbsExpected []
|
||||||
}
|
}
|
||||||
|
|
||||||
// verify that empty tsids returns empty result
|
// verify that empty tsids returns empty result
|
||||||
ts.Init(tb, []TSID{}, tr, true)
|
ts.Init(tb, []TSID{}, tr)
|
||||||
if ts.NextBlock() {
|
if ts.NextBlock() {
|
||||||
return fmt.Errorf("unexpected block got for an empty tsids list: %+v", ts.Block)
|
return fmt.Errorf("unexpected block got for an empty tsids list: %+v", ts.BlockRef)
|
||||||
}
|
}
|
||||||
if err := ts.Error(); err != nil {
|
if err := ts.Error(); err != nil {
|
||||||
return fmt.Errorf("unexpected error on empty tsids list: %s", err)
|
return fmt.Errorf("unexpected error on empty tsids list: %s", err)
|
||||||
}
|
}
|
||||||
ts.MustClose()
|
ts.MustClose()
|
||||||
|
|
||||||
ts.Init(tb, []TSID{}, tr, false)
|
|
||||||
if ts.NextBlock() {
|
|
||||||
return fmt.Errorf("unexpected block got for an empty tsids list with fetchData=false: %+v", ts.Block)
|
|
||||||
}
|
|
||||||
if err := ts.Error(); err != nil {
|
|
||||||
return fmt.Errorf("unexpected error on empty tsids list with fetchData=false: %s", err)
|
|
||||||
}
|
|
||||||
ts.MustClose()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -127,12 +127,14 @@ func benchmarkTableSearch(b *testing.B, rowsCount, tsidsCount, tsidsSearch int,
|
||||||
b.RunParallel(func(pb *testing.PB) {
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
var ts tableSearch
|
var ts tableSearch
|
||||||
tsids := make([]TSID, tsidsSearch)
|
tsids := make([]TSID, tsidsSearch)
|
||||||
|
var tmpBlock Block
|
||||||
for pb.Next() {
|
for pb.Next() {
|
||||||
for i := range tsids {
|
for i := range tsids {
|
||||||
tsids[i].MetricID = 1 + uint64(i)
|
tsids[i].MetricID = 1 + uint64(i)
|
||||||
}
|
}
|
||||||
ts.Init(tb, tsids, tr, fetchData)
|
ts.Init(tb, tsids, tr)
|
||||||
for ts.NextBlock() {
|
for ts.NextBlock() {
|
||||||
|
ts.BlockRef.MustReadBlock(&tmpBlock, fetchData)
|
||||||
}
|
}
|
||||||
ts.MustClose()
|
ts.MustClose()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue