diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index a199f399b..5f61882c3 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -45,9 +45,10 @@ func (r *Result) reset() { // Results holds results returned from ProcessSearchQuery. type Results struct { - at *auth.Token - tr storage.TimeRange - deadline Deadline + at *auth.Token + tr storage.TimeRange + fetchData bool + deadline Deadline tbf *tmpBlocksFile @@ -100,10 +101,10 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error { err = fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.Timeout) break } - if err = pts.Unpack(rss.tbf, rs, rss.tr, rss.at, maxWorkersCount); err != nil { + if err = pts.Unpack(rss.tbf, rs, rss.tr, rss.fetchData, rss.at, maxWorkersCount); err != nil { break } - if len(rs.Timestamps) == 0 { + if len(rs.Timestamps) == 0 && rss.fetchData { // Skip empty blocks. continue } @@ -146,7 +147,7 @@ type packedTimeseries struct { } // Unpack unpacks pts to dst. -func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.TimeRange, at *auth.Token, maxWorkersCount int) error { +func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.TimeRange, fetchData bool, at *auth.Token, maxWorkersCount int) error { dst.reset() if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil { @@ -173,7 +174,7 @@ func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage. var err error for addr := range workCh { sb := getSortBlock() - if err = sb.unpackFrom(tbf, addr, tr, at); err != nil { + if err = sb.unpackFrom(tbf, addr, tr, fetchData, at); err != nil { break } @@ -292,10 +293,12 @@ func (sb *sortBlock) reset() { sb.NextIdx = 0 } -func (sb *sortBlock) unpackFrom(tbf *tmpBlocksFile, addr tmpBlockAddr, tr storage.TimeRange, at *auth.Token) error { +func (sb *sortBlock) unpackFrom(tbf *tmpBlocksFile, addr tmpBlockAddr, tr storage.TimeRange, fetchData bool, at *auth.Token) error { tbf.MustReadBlockAt(&sb.b, addr) - if err := sb.b.UnmarshalData(); err != nil { - return fmt.Errorf("cannot unmarshal block: %s", err) + if fetchData { + if err := sb.b.UnmarshalData(); err != nil { + return fmt.Errorf("cannot unmarshal block: %s", err) + } } timestamps := sb.b.Timestamps() @@ -692,7 +695,7 @@ func GetSeriesCount(at *auth.Token, deadline Deadline) (uint64, bool, error) { } // ProcessSearchQuery performs sq on storage nodes until the given deadline. -func ProcessSearchQuery(at *auth.Token, sq *storage.SearchQuery, deadline Deadline) (*Results, bool, error) { +func ProcessSearchQuery(at *auth.Token, sq *storage.SearchQuery, fetchData bool, deadline Deadline) (*Results, bool, error) { requestData := sq.Marshal(nil) // Send the query to all the storage nodes in parallel. @@ -708,7 +711,7 @@ func ProcessSearchQuery(at *auth.Token, sq *storage.SearchQuery, deadline Deadli for _, sn := range storageNodes { go func(sn *storageNode) { sn.searchRequests.Inc() - results, err := sn.processSearchQuery(requestData, tr, deadline) + results, err := sn.processSearchQuery(requestData, tr, fetchData, deadline) if err != nil { sn.searchRequestErrors.Inc() err = fmt.Errorf("cannot perform search on vmstorage %s: %s", sn.connPool.Addr(), err) @@ -769,6 +772,7 @@ func ProcessSearchQuery(at *auth.Token, sq *storage.SearchQuery, deadline Deadli rss.packedTimeseries = make([]packedTimeseries, len(m)) rss.at = at rss.tr = tr + rss.fetchData = fetchData rss.deadline = deadline rss.tbf = tbf i := 0 @@ -931,20 +935,20 @@ func (sn *storageNode) getSeriesCount(accountID, projectID uint32, deadline Dead return n, nil } -func (sn *storageNode) processSearchQuery(requestData []byte, tr storage.TimeRange, deadline Deadline) ([]*storage.MetricBlock, error) { +func (sn *storageNode) processSearchQuery(requestData []byte, tr storage.TimeRange, fetchData bool, deadline Deadline) ([]*storage.MetricBlock, error) { var results []*storage.MetricBlock f := func(bc *handshake.BufferedConn) error { - rs, err := sn.processSearchQueryOnConn(bc, requestData, tr) + rs, err := sn.processSearchQueryOnConn(bc, requestData, tr, fetchData) if err != nil { return err } results = rs return nil } - if err := sn.execOnConn("search_v2", f, deadline); err != nil { + if err := sn.execOnConn("search_v3", f, deadline); err != nil { // Try again before giving up. results = nil - if err = sn.execOnConn("search_v2", f, deadline); err != nil { + if err = sn.execOnConn("search_v3", f, deadline); err != nil { return nil, err } } @@ -1197,11 +1201,14 @@ const maxMetricBlockSize = 1024 * 1024 // from vmstorage. const maxErrorMessageSize = 64 * 1024 -func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requestData []byte, tr storage.TimeRange) ([]*storage.MetricBlock, error) { +func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requestData []byte, tr storage.TimeRange, fetchData bool) ([]*storage.MetricBlock, error) { // Send the request to sn. if err := writeBytes(bc, requestData); err != nil { return nil, fmt.Errorf("cannot write requestData: %s", err) } + if err := writeBool(bc, fetchData); err != nil { + return nil, fmt.Errorf("cannot write fetchData=%v: %s", fetchData, err) + } if err := bc.Flush(); err != nil { return nil, fmt.Errorf("cannot flush requestData to conn: %s", err) } @@ -1265,6 +1272,17 @@ func writeUint32(bc *handshake.BufferedConn, n uint32) error { return nil } +func writeBool(bc *handshake.BufferedConn, b bool) error { + var buf [1]byte + if b { + buf[0] = 1 + } + if _, err := bc.Write(buf[:]); err != nil { + return err + } + return nil +} + func readBytes(buf []byte, bc *handshake.BufferedConn, maxDataSize int) ([]byte, error) { buf = bytesutil.Resize(buf, 8) if _, err := io.ReadFull(bc, buf); err != nil { diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 0586789b5..f79330b60 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -72,7 +72,7 @@ func FederateHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) err MaxTimestamp: end, TagFilterss: tagFilterss, } - rss, isPartial, err := netstorage.ProcessSearchQuery(at, sq, deadline) + rss, isPartial, err := netstorage.ProcessSearchQuery(at, sq, true, deadline) if err != nil { return fmt.Errorf("cannot fetch data for %q: %s", sq, err) } @@ -169,7 +169,7 @@ func exportHandler(at *auth.Token, w http.ResponseWriter, matches []string, star MaxTimestamp: end, TagFilterss: tagFilterss, } - rss, isPartial, err := netstorage.ProcessSearchQuery(at, sq, deadline) + rss, isPartial, err := netstorage.ProcessSearchQuery(at, sq, true, deadline) if err != nil { return fmt.Errorf("cannot fetch data for %q: %s", sq, err) } @@ -405,7 +405,7 @@ func SeriesHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error MaxTimestamp: end, TagFilterss: tagFilterss, } - rss, isPartial, err := netstorage.ProcessSearchQuery(at, sq, deadline) + rss, isPartial, err := netstorage.ProcessSearchQuery(at, sq, false, deadline) if err != nil { return fmt.Errorf("cannot fetch data for %q: %s", sq, err) } diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index ead47a162..50062e4b5 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -578,8 +578,7 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me MaxTimestamp: ec.End + ec.Step, TagFilterss: [][]storage.TagFilter{me.TagFilters}, } - - rss, isPartial, err := netstorage.ProcessSearchQuery(ec.AuthToken, sq, ec.Deadline) + rss, isPartial, err := netstorage.ProcessSearchQuery(ec.AuthToken, sq, true, ec.Deadline) if err != nil { return nil, err } diff --git a/app/vmstorage/transport/server.go b/app/vmstorage/transport/server.go index 41421e697..a5a2cd6e6 100644 --- a/app/vmstorage/transport/server.go +++ b/app/vmstorage/transport/server.go @@ -392,6 +392,18 @@ func (ctx *vmselectRequestCtx) readDataBufBytes(maxDataSize int) error { return nil } +func (ctx *vmselectRequestCtx) readBool() (bool, error) { + ctx.dataBuf = bytesutil.Resize(ctx.dataBuf, 1) + if _, err := io.ReadFull(ctx.bc, ctx.dataBuf); err != nil { + if err == io.EOF { + return false, err + } + return false, fmt.Errorf("cannot read bool: %s", err) + } + v := ctx.dataBuf[0] != 0 + return v, nil +} + func (ctx *vmselectRequestCtx) writeDataBufBytes() error { if err := ctx.writeUint64(uint64(len(ctx.dataBuf))); err != nil { return fmt.Errorf("cannot write data size: %s", err) @@ -443,7 +455,7 @@ func (s *Server) processVMSelectRequest(ctx *vmselectRequestCtx) error { }() switch string(ctx.dataBuf) { - case "search_v2": + case "search_v3": return s.processVMSelectSearchQuery(ctx) case "labelValues": return s.processVMSelectLabelValues(ctx) @@ -714,6 +726,10 @@ func (s *Server) processVMSelectSearchQuery(ctx *vmselectRequestCtx) error { if len(tail) > 0 { return fmt.Errorf("unexpected non-zero tail left after unmarshaling SearchQuery: (len=%d) %q", len(tail), tail) } + fetchData, err := ctx.readBool() + if err != nil { + return fmt.Errorf("cannot read `fetchData` bool: %s", err) + } // Setup search. if err := ctx.setupTfss(); err != nil { @@ -728,7 +744,7 @@ func (s *Server) processVMSelectSearchQuery(ctx *vmselectRequestCtx) error { MinTimestamp: ctx.sq.MinTimestamp, MaxTimestamp: ctx.sq.MaxTimestamp, } - ctx.sr.Init(s.storage, ctx.tfss, tr, *maxMetricsPerSearch) + ctx.sr.Init(s.storage, ctx.tfss, tr, fetchData, *maxMetricsPerSearch) defer ctx.sr.MustClose() if err := ctx.sr.Error(); err != nil { // Send the error message to vmselect. diff --git a/lib/storage/part_search.go b/lib/storage/part_search.go index cfbac8cf0..b25fe59a0 100644 --- a/lib/storage/part_search.go +++ b/lib/storage/part_search.go @@ -28,6 +28,9 @@ type partSearch struct { // tr is a time range to search. tr TimeRange + // Skip populating timestampsData and valuesData in Block if fetchData=false. + fetchData bool + metaindex []metaindexRow ibCache *indexBlockCache @@ -61,7 +64,7 @@ func (ps *partSearch) reset() { } // Init initializes the ps with the given p, tsids and tr. -func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange) { +func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange, fetchData bool) { ps.reset() ps.p = p @@ -72,6 +75,7 @@ func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange) { ps.tsids = append(ps.tsids[:0], tsids...) } ps.tr = tr + ps.fetchData = fetchData ps.metaindex = p.metaindex ps.ibCache = &p.ibCache @@ -281,11 +285,14 @@ func (ps *partSearch) searchBHS() bool { func (ps *partSearch) readBlock(bh *blockHeader) { ps.Block.Reset() + ps.Block.bh = *bh + if !ps.fetchData { + return + } + ps.Block.timestampsData = bytesutil.Resize(ps.Block.timestampsData[:0], int(bh.TimestampsBlockSize)) ps.p.timestampsFile.ReadAt(ps.Block.timestampsData, int64(bh.TimestampsBlockOffset)) ps.Block.valuesData = bytesutil.Resize(ps.Block.valuesData[:0], int(bh.ValuesBlockSize)) ps.p.valuesFile.ReadAt(ps.Block.valuesData, int64(bh.ValuesBlockOffset)) - - ps.Block.bh = *bh } diff --git a/lib/storage/part_search_test.go b/lib/storage/part_search_test.go index a56f04f01..11166debf 100644 --- a/lib/storage/part_search_test.go +++ b/lib/storage/part_search_test.go @@ -1247,7 +1247,7 @@ func testPartSearch(t *testing.T, p *part, tsids []TSID, tr TimeRange, expectedR func testPartSearchSerial(p *part, tsids []TSID, tr TimeRange, expectedRawBlocks []rawBlock) error { var ps partSearch - ps.Init(p, tsids, tr) + ps.Init(p, tsids, tr, true) var bs []Block for ps.NextBlock() { var b Block diff --git a/lib/storage/partition_search.go b/lib/storage/partition_search.go index 815ec3589..475ffed5a 100644 --- a/lib/storage/partition_search.go +++ b/lib/storage/partition_search.go @@ -56,7 +56,7 @@ func (pts *partitionSearch) reset() { // Init initializes the search in the given partition for the given tsid and tr. // // MustClose must be called when partition search is done. -func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange) { +func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange, fetchData bool) { if pts.needClosing { logger.Panicf("BUG: missing partitionSearch.MustClose call before the next call to Init") } @@ -85,7 +85,7 @@ func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange) { } pts.psPool = pts.psPool[:len(pts.pws)] for i, pw := range pts.pws { - pts.psPool[i].Init(pw.p, tsids, tr) + pts.psPool[i].Init(pw.p, tsids, tr, fetchData) } // Initialize the psHeap. diff --git a/lib/storage/partition_search_test.go b/lib/storage/partition_search_test.go index 0b23ecc16..fba878590 100644 --- a/lib/storage/partition_search_test.go +++ b/lib/storage/partition_search_test.go @@ -238,7 +238,7 @@ func testPartitionSearchSerial(pt *partition, tsids []TSID, tr TimeRange, rbsExp bs := []Block{} var pts partitionSearch - pts.Init(pt, tsids, tr) + pts.Init(pt, tsids, tr, true) for pts.NextBlock() { var b Block b.CopyFrom(pts.Block) @@ -263,7 +263,7 @@ func testPartitionSearchSerial(pt *partition, tsids []TSID, tr TimeRange, rbsExp } // verify that empty tsids returns empty result - pts.Init(pt, []TSID{}, tr) + pts.Init(pt, []TSID{}, tr, true) if pts.NextBlock() { return fmt.Errorf("unexpected block got for an empty tsids list: %+v", pts.Block) } @@ -271,6 +271,16 @@ func testPartitionSearchSerial(pt *partition, tsids []TSID, tr TimeRange, rbsExp return fmt.Errorf("unexpected error on empty tsids list: %s", err) } pts.MustClose() + + pts.Init(pt, []TSID{}, tr, false) + if pts.NextBlock() { + return fmt.Errorf("unexpected block got for an empty tsids list: %+v", pts.Block) + } + if err := pts.Error(); err != nil { + return fmt.Errorf("unexpected error on empty tsids list: %s", err) + } + pts.MustClose() + return nil } diff --git a/lib/storage/search.go b/lib/storage/search.go index 468824f28..0433c8aa3 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -110,7 +110,7 @@ func (s *Search) reset() { // Init initializes s from the given storage, tfss and tr. // // MustClose must be called when the search is done. -func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, maxMetrics int) { +func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, fetchData bool, maxMetrics int) { if s.needClosing { logger.Panicf("BUG: missing MustClose call before the next call to Init") } @@ -123,7 +123,7 @@ func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, maxMet // It is ok to call Init on error from storage.searchTSIDs. // Init must be called before returning because it will fail // on Seach.MustClose otherwise. - s.ts.Init(storage.tb, tsids, tr) + s.ts.Init(storage.tb, tsids, tr, fetchData) if err != nil { s.err = err diff --git a/lib/storage/search_test.go b/lib/storage/search_test.go index 69c9512c3..924baf845 100644 --- a/lib/storage/search_test.go +++ b/lib/storage/search_test.go @@ -200,7 +200,7 @@ func testSearch(st *Storage, tr TimeRange, mrs []MetricRow, accountsCount int) e } // Search - s.Init(st, []*TagFilters{tfs}, tr, 1e5) + s.Init(st, []*TagFilters{tfs}, tr, true, 1e5) var mbs []MetricBlock for s.NextMetricBlock() { var b Block diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index a73684eec..958fca790 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -506,12 +506,24 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error { MaxTimestamp: 2e10, } metricBlocksCount := func(tfs *TagFilters) int { + // Verify the number of blocks with fetchData=true n := 0 - sr.Init(s, []*TagFilters{tfs}, tr, 1e5) + sr.Init(s, []*TagFilters{tfs}, tr, true, 1e5) for sr.NextMetricBlock() { n++ } sr.MustClose() + + // Make sure the number of blocks with fetchData=false is the same. + m := 0 + sr.Init(s, []*TagFilters{tfs}, tr, false, 1e5) + for sr.NextMetricBlock() { + m++ + } + sr.MustClose() + if n != m { + return -1 + } return n } for i := 0; i < metricsCount; i++ { diff --git a/lib/storage/table_search.go b/lib/storage/table_search.go index 8a9c31129..aa5b81476 100644 --- a/lib/storage/table_search.go +++ b/lib/storage/table_search.go @@ -55,7 +55,7 @@ func (ts *tableSearch) reset() { // Init initializes the ts. // // MustClose must be called then the tableSearch is done. -func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange) { +func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange, fetchData bool) { if ts.needClosing { logger.Panicf("BUG: missing MustClose call before the next call to Init") } @@ -86,7 +86,7 @@ func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange) { } ts.ptsPool = ts.ptsPool[:len(ts.ptws)] for i, ptw := range ts.ptws { - ts.ptsPool[i].Init(ptw.pt, tsids, tr) + ts.ptsPool[i].Init(ptw.pt, tsids, tr, fetchData) } // Initialize the ptsHeap. diff --git a/lib/storage/table_search_test.go b/lib/storage/table_search_test.go index c116e4e84..6c75a1a08 100644 --- a/lib/storage/table_search_test.go +++ b/lib/storage/table_search_test.go @@ -251,7 +251,7 @@ func testTableSearchSerial(tb *table, tsids []TSID, tr TimeRange, rbsExpected [] bs := []Block{} var ts tableSearch - ts.Init(tb, tsids, tr) + ts.Init(tb, tsids, tr, true) for ts.NextBlock() { var b Block b.CopyFrom(ts.Block) @@ -276,7 +276,7 @@ func testTableSearchSerial(tb *table, tsids []TSID, tr TimeRange, rbsExpected [] } // verify that empty tsids returns empty result - ts.Init(tb, []TSID{}, tr) + ts.Init(tb, []TSID{}, tr, true) if ts.NextBlock() { return fmt.Errorf("unexpected block got for an empty tsids list: %+v", ts.Block) } @@ -284,5 +284,15 @@ func testTableSearchSerial(tb *table, tsids []TSID, tr TimeRange, rbsExpected [] return fmt.Errorf("unexpected error on empty tsids list: %s", err) } ts.MustClose() + + ts.Init(tb, []TSID{}, tr, false) + if ts.NextBlock() { + return fmt.Errorf("unexpected block got for an empty tsids list with fetchData=false: %+v", ts.Block) + } + if err := ts.Error(); err != nil { + return fmt.Errorf("unexpected error on empty tsids list with fetchData=false: %s", err) + } + ts.MustClose() + return nil } diff --git a/lib/storage/table_search_timing_test.go b/lib/storage/table_search_timing_test.go index 917664de6..df9af44f4 100644 --- a/lib/storage/table_search_timing_test.go +++ b/lib/storage/table_search_timing_test.go @@ -26,7 +26,11 @@ func BenchmarkTableSearch(b *testing.B) { b.Run(fmt.Sprintf("tsidsCount_%d", tsidsCount), func(b *testing.B) { for _, tsidsSearch := range []int{1, 1e1, 1e2, 1e3, 1e4} { b.Run(fmt.Sprintf("tsidsSearch_%d", tsidsSearch), func(b *testing.B) { - benchmarkTableSearch(b, rowsCount, tsidsCount, tsidsSearch) + for _, fetchData := range []bool{true, false} { + b.Run(fmt.Sprintf("fetchData_%v", fetchData), func(b *testing.B) { + benchmarkTableSearch(b, rowsCount, tsidsCount, tsidsSearch, fetchData) + }) + } }) } }) @@ -103,9 +107,9 @@ func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerIn tb.MustClose() } -func benchmarkTableSearch(b *testing.B, rowsCount, tsidsCount, tsidsSearch int) { +func benchmarkTableSearch(b *testing.B, rowsCount, tsidsCount, tsidsSearch int, fetchData bool) { startTimestamp := timestampFromTime(time.Now()) - 365*24*3600*1000 - rowsPerInsert := int(maxRawRowsPerPartition) + rowsPerInsert := getMaxRawRowsPerPartition() tb := openBenchTable(b, startTimestamp, rowsPerInsert, rowsCount, tsidsCount) tr := TimeRange{ @@ -127,7 +131,7 @@ func benchmarkTableSearch(b *testing.B, rowsCount, tsidsCount, tsidsSearch int) for i := range tsids { tsids[i].MetricID = 1 + uint64(i) } - ts.Init(tb, tsids, tr) + ts.Init(tb, tsids, tr, fetchData) for ts.NextBlock() { } ts.MustClose()