diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 243f744f02..6501de6f30 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -61,10 +61,9 @@ func (r *Result) reset() { // Results holds results returned from ProcessSearchQuery. type Results struct { - at *auth.Token - tr storage.TimeRange - fetchData bool - deadline searchutils.Deadline + at *auth.Token + tr storage.TimeRange + deadline searchutils.Deadline tbf *tmpBlocksFile @@ -151,11 +150,11 @@ func (tsw *timeseriesWork) do(r *Result, workerID uint) error { atomic.StoreUint32(tsw.mustStop, 1) return fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String()) } - if err := tsw.pts.Unpack(r, rss.tbf, rss.tr, rss.fetchData, rss.at); err != nil { + if err := tsw.pts.Unpack(r, rss.tbf, rss.tr, rss.at); err != nil { atomic.StoreUint32(tsw.mustStop, 1) return fmt.Errorf("error during time series unpacking: %w", err) } - if len(r.Timestamps) > 0 || !rss.fetchData { + if len(r.Timestamps) > 0 { if err := tsw.f(r, workerID); err != nil { atomic.StoreUint32(tsw.mustStop, 1) return err @@ -387,15 +386,11 @@ var tmpBlockPool sync.Pool var unpackBatchSize = 5000 // Unpack unpacks pts to dst. -func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.TimeRange, fetchData bool, at *auth.Token) error { +func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.TimeRange, at *auth.Token) error { dst.reset() if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil { return fmt.Errorf("cannot unmarshal metricName %q: %w", pts.metricName, err) } - if !fetchData { - // Do not spend resources on data reading and unpacking. - return nil - } // Spin up local workers. // Do not use global workers pool, since it increases inter-CPU memory ping-poing, @@ -1167,7 +1162,7 @@ func ExportBlocks(qt *querytracer.Tracer, at *auth.Token, sq *storage.SearchQuer atomic.AddUint64(&samples, uint64(mb.Block.RowsCount())) return nil } - _, err := processSearchQuery(qt, at, true, sq, true, processBlock, deadline) + _, err := processSearchQuery(qt, at, true, sq, processBlock, deadline) // Make sure processBlock isn't called anymore in order to prevent from data races. atomic.StoreUint32(&stopped, 1) @@ -1239,9 +1234,8 @@ func SearchMetricNames(qt *querytracer.Tracer, at *auth.Token, denyPartialRespon // ProcessSearchQuery performs sq until the given deadline. // // Results.RunParallel or Results.Cancel must be called on the returned Results. -func ProcessSearchQuery(qt *querytracer.Tracer, at *auth.Token, denyPartialResponse bool, sq *storage.SearchQuery, - fetchData bool, deadline searchutils.Deadline) (*Results, bool, error) { - qt = qt.NewChild("fetch matching series: %s, fetchData=%v", sq, fetchData) +func ProcessSearchQuery(qt *querytracer.Tracer, at *auth.Token, denyPartialResponse bool, sq *storage.SearchQuery, deadline searchutils.Deadline) (*Results, bool, error) { + qt = qt.NewChild("fetch matching series: %s", sq) defer qt.Done() if deadline.Exceeded() { return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) @@ -1266,10 +1260,6 @@ func ProcessSearchQuery(qt *querytracer.Tracer, at *auth.Token, denyPartialRespo if atomic.LoadUint32(&stopped) != 0 { return nil } - if !fetchData { - tbfw.RegisterEmptyBlock(mb) - return nil - } atomic.AddUint64(&blocksRead, 1) n := atomic.AddUint64(&samples, uint64(mb.Block.RowsCount())) if *maxSamplesPerQuery > 0 && n > uint64(*maxSamplesPerQuery) { @@ -1280,7 +1270,7 @@ func ProcessSearchQuery(qt *querytracer.Tracer, at *auth.Token, denyPartialRespo } return nil } - isPartial, err := processSearchQuery(qt, at, denyPartialResponse, sq, fetchData, processBlock, deadline) + isPartial, err := processSearchQuery(qt, at, denyPartialResponse, sq, processBlock, deadline) // Make sure processBlock isn't called anymore in order to protect from data races. atomic.StoreUint32(&stopped, 1) @@ -1299,7 +1289,6 @@ func ProcessSearchQuery(qt *querytracer.Tracer, at *auth.Token, denyPartialRespo var rss Results rss.at = at rss.tr = tr - rss.fetchData = fetchData rss.deadline = deadline rss.tbf = tbfw.tbf pts := make([]packedTimeseries, len(tbfw.orderedMetricNames)) @@ -1313,14 +1302,14 @@ func ProcessSearchQuery(qt *querytracer.Tracer, at *auth.Token, denyPartialRespo return &rss, isPartial, nil } -func processSearchQuery(qt *querytracer.Tracer, at *auth.Token, denyPartialResponse bool, sq *storage.SearchQuery, fetchData bool, +func processSearchQuery(qt *querytracer.Tracer, at *auth.Token, denyPartialResponse bool, sq *storage.SearchQuery, processBlock func(mb *storage.MetricBlock) error, deadline searchutils.Deadline) (bool, error) { requestData := sq.Marshal(nil) // Send the query to all the storage nodes in parallel. snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} { sn.searchRequests.Inc() - err := sn.processSearchQuery(qt, requestData, fetchData, processBlock, deadline) + err := sn.processSearchQuery(qt, requestData, processBlock, deadline) if err != nil { sn.searchErrors.Inc() err = fmt.Errorf("cannot perform search on vmstorage %s: %w", sn.connPool.Addr(), err) @@ -1619,15 +1608,14 @@ func (sn *storageNode) processSearchMetricNames(qt *querytracer.Tracer, requestD return metricNames, nil } -func (sn *storageNode) processSearchQuery(qt *querytracer.Tracer, requestData []byte, fetchData bool, - processBlock func(mb *storage.MetricBlock) error, deadline searchutils.Deadline) error { +func (sn *storageNode) processSearchQuery(qt *querytracer.Tracer, requestData []byte, processBlock func(mb *storage.MetricBlock) error, deadline searchutils.Deadline) error { f := func(bc *handshake.BufferedConn) error { - if err := sn.processSearchQueryOnConn(bc, requestData, fetchData, processBlock); err != nil { + if err := sn.processSearchQueryOnConn(bc, requestData, processBlock); err != nil { return err } return nil } - return sn.execOnConnWithPossibleRetry(qt, "search_v6", f, deadline) + return sn.execOnConnWithPossibleRetry(qt, "search_v7", f, deadline) } func (sn *storageNode) execOnConnWithPossibleRetry(qt *querytracer.Tracer, funcName string, f func(bc *handshake.BufferedConn) error, deadline searchutils.Deadline) error { @@ -2119,14 +2107,11 @@ func (sn *storageNode) processSearchMetricNamesOnConn(bc *handshake.BufferedConn const maxMetricNameSize = 64 * 1024 -func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requestData []byte, fetchData bool, processBlock func(mb *storage.MetricBlock) error) error { +func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requestData []byte, processBlock func(mb *storage.MetricBlock) error) error { // Send the request to sn. if err := writeBytes(bc, requestData); err != nil { return fmt.Errorf("cannot write requestData: %w", err) } - if err := writeBool(bc, fetchData); err != nil { - return fmt.Errorf("cannot write fetchData=%v: %w", fetchData, err) - } if err := bc.Flush(); err != nil { return fmt.Errorf("cannot flush requestData to conn: %w", err) } diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index fb114ef5dd..af72f6089e 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -78,7 +78,7 @@ func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, } sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, *maxFederateSeries) denyPartialResponse := searchutils.GetDenyPartialResponse(r) - rss, isPartial, err := netstorage.ProcessSearchQuery(nil, at, denyPartialResponse, sq, true, cp.deadline) + rss, isPartial, err := netstorage.ProcessSearchQuery(nil, at, denyPartialResponse, sq, cp.deadline) if err != nil { return fmt.Errorf("cannot fetch data for %q: %w", sq, err) } @@ -145,7 +145,7 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter // Unconditionally deny partial response for the exported data, // since users usually expect that the exported data is full. denyPartialResponse := true - rss, _, err := netstorage.ProcessSearchQuery(nil, at, denyPartialResponse, sq, true, cp.deadline) + rss, _, err := netstorage.ProcessSearchQuery(nil, at, denyPartialResponse, sq, cp.deadline) if err != nil { return fmt.Errorf("cannot fetch data for %q: %w", sq, err) } @@ -350,7 +350,7 @@ func exportHandler(qt *querytracer.Tracer, at *auth.Token, w http.ResponseWriter // Unconditionally deny partial response for the exported data, // since users usually expect that the exported data is full. denyPartialResponse := true - rss, _, err := netstorage.ProcessSearchQuery(qt, at, denyPartialResponse, sq, true, cp.deadline) + rss, _, err := netstorage.ProcessSearchQuery(qt, at, denyPartialResponse, sq, cp.deadline) if err != nil { return fmt.Errorf("cannot fetch data for %q: %w", sq, err) } @@ -674,67 +674,20 @@ func SeriesHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, cp.start = cp.end - defaultStep } sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, *maxSeriesLimit) - qtDone := func() { - qt.Donef("start=%d, end=%d", cp.start, cp.end) - } denyPartialResponse := searchutils.GetDenyPartialResponse(r) - if cp.end-cp.start > 24*3600*1000 { - // It is cheaper to call SearchMetricNames on time ranges exceeding a day. - mns, isPartial, err := netstorage.SearchMetricNames(qt, at, denyPartialResponse, sq, cp.deadline) - if err != nil { - return fmt.Errorf("cannot fetch time series for %q: %w", sq, err) - } - w.Header().Set("Content-Type", "application/json") - bw := bufferedwriter.Get(w) - defer bufferedwriter.Put(bw) - resultsCh := make(chan *quicktemplate.ByteBuffer) - go func() { - for i := range mns { - bb := quicktemplate.AcquireByteBuffer() - writemetricNameObject(bb, &mns[i]) - resultsCh <- bb - } - close(resultsCh) - }() - // WriteSeriesResponse must consume all the data from resultsCh. - WriteSeriesResponse(bw, isPartial, resultsCh, qt, qtDone) - if err := bw.Flush(); err != nil { - return err - } - seriesDuration.UpdateDuration(startTime) - return nil - } - rss, isPartial, err := netstorage.ProcessSearchQuery(qt, at, denyPartialResponse, sq, false, cp.deadline) + mns, isPartial, err := netstorage.SearchMetricNames(qt, at, denyPartialResponse, sq, cp.deadline) if err != nil { - return fmt.Errorf("cannot fetch data for %q: %w", sq, err) + return fmt.Errorf("cannot fetch time series for %q: %w", sq, err) } - w.Header().Set("Content-Type", "application/json") bw := bufferedwriter.Get(w) defer bufferedwriter.Put(bw) - resultsCh := make(chan *quicktemplate.ByteBuffer) - doneCh := make(chan error) - go func() { - err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error { - if err := bw.Error(); err != nil { - return err - } - bb := quicktemplate.AcquireByteBuffer() - writemetricNameObject(bb, &rs.MetricName) - resultsCh <- bb - return nil - }) - close(resultsCh) - doneCh <- err - }() - // WriteSeriesResponse must consume all the data from resultsCh. - WriteSeriesResponse(bw, isPartial, resultsCh, qt, qtDone) - if err := bw.Flush(); err != nil { - return fmt.Errorf("cannot flush series response to remote client: %w", err) + qtDone := func() { + qt.Donef("start=%d, end=%d", cp.start, cp.end) } - err = <-doneCh - if err != nil { - return fmt.Errorf("cannot send series response to remote client: %w", err) + WriteSeriesResponse(bw, isPartial, mns, qt, qtDone) + if err := bw.Flush(); err != nil { + return err } return nil } diff --git a/app/vmselect/prometheus/series_response.qtpl b/app/vmselect/prometheus/series_response.qtpl index 0d435745a4..0cb7f9aefb 100644 --- a/app/vmselect/prometheus/series_response.qtpl +++ b/app/vmselect/prometheus/series_response.qtpl @@ -1,35 +1,23 @@ {% import ( - "github.com/valyala/quicktemplate" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" ) %} {% stripspace %} SeriesResponse generates response for /api/v1/series. See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers -{% func SeriesResponse(isPartial bool, resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer, qtDone func()) %} +{% func SeriesResponse(isPartial bool, mns []storage.MetricName, qt *querytracer.Tracer, qtDone func()) %} { - {% code seriesCount := 0 %} "status":"success", "isPartial":{% if isPartial %}true{% else %}false{% endif %}, "data":[ - {% code bb, ok := <-resultsCh %} - {% if ok %} - {%z= bb.B %} - {% code - quicktemplate.ReleaseByteBuffer(bb) - seriesCount++ - %} - {% for bb := range resultsCh %} - ,{%z= bb.B %} - {% code - quicktemplate.ReleaseByteBuffer(bb) - seriesCount++ - %} - {% endfor %} - {% endif %} + {% for i := range mns %} + {%= metricNameObject(&mns[i]) %} + {% if i+1 < len(mns) %},{% endif %} + {% endfor %} ] {% code - qt.Printf("generate response: series=%d", seriesCount) + qt.Printf("generate response: series=%d", len(mns)) qtDone() %} {%= dumpQueryTrace(qt) %} diff --git a/app/vmselect/prometheus/series_response.qtpl.go b/app/vmselect/prometheus/series_response.qtpl.go index efd6abb173..ff49a935d9 100644 --- a/app/vmselect/prometheus/series_response.qtpl.go +++ b/app/vmselect/prometheus/series_response.qtpl.go @@ -7,7 +7,7 @@ package prometheus //line app/vmselect/prometheus/series_response.qtpl:1 import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" - "github.com/valyala/quicktemplate" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" ) // SeriesResponse generates response for /api/v1/series.See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers @@ -26,86 +26,68 @@ var ( ) //line app/vmselect/prometheus/series_response.qtpl:9 -func StreamSeriesResponse(qw422016 *qt422016.Writer, isPartial bool, resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer, qtDone func()) { +func StreamSeriesResponse(qw422016 *qt422016.Writer, isPartial bool, mns []storage.MetricName, qt *querytracer.Tracer, qtDone func()) { //line app/vmselect/prometheus/series_response.qtpl:9 - qw422016.N().S(`{`) -//line app/vmselect/prometheus/series_response.qtpl:11 - seriesCount := 0 - -//line app/vmselect/prometheus/series_response.qtpl:11 - qw422016.N().S(`"status":"success","isPartial":`) -//line app/vmselect/prometheus/series_response.qtpl:13 + qw422016.N().S(`{"status":"success","isPartial":`) +//line app/vmselect/prometheus/series_response.qtpl:12 if isPartial { -//line app/vmselect/prometheus/series_response.qtpl:13 +//line app/vmselect/prometheus/series_response.qtpl:12 qw422016.N().S(`true`) -//line app/vmselect/prometheus/series_response.qtpl:13 +//line app/vmselect/prometheus/series_response.qtpl:12 } else { -//line app/vmselect/prometheus/series_response.qtpl:13 +//line app/vmselect/prometheus/series_response.qtpl:12 qw422016.N().S(`false`) -//line app/vmselect/prometheus/series_response.qtpl:13 +//line app/vmselect/prometheus/series_response.qtpl:12 } -//line app/vmselect/prometheus/series_response.qtpl:13 +//line app/vmselect/prometheus/series_response.qtpl:12 qw422016.N().S(`,"data":[`) +//line app/vmselect/prometheus/series_response.qtpl:14 + for i := range mns { //line app/vmselect/prometheus/series_response.qtpl:15 - bb, ok := <-resultsCh - + streammetricNameObject(qw422016, &mns[i]) +//line app/vmselect/prometheus/series_response.qtpl:16 + if i+1 < len(mns) { //line app/vmselect/prometheus/series_response.qtpl:16 - if ok { -//line app/vmselect/prometheus/series_response.qtpl:17 - qw422016.N().Z(bb.B) -//line app/vmselect/prometheus/series_response.qtpl:19 - quicktemplate.ReleaseByteBuffer(bb) - seriesCount++ - -//line app/vmselect/prometheus/series_response.qtpl:22 - for bb := range resultsCh { -//line app/vmselect/prometheus/series_response.qtpl:22 qw422016.N().S(`,`) -//line app/vmselect/prometheus/series_response.qtpl:23 - qw422016.N().Z(bb.B) -//line app/vmselect/prometheus/series_response.qtpl:25 - quicktemplate.ReleaseByteBuffer(bb) - seriesCount++ - -//line app/vmselect/prometheus/series_response.qtpl:28 +//line app/vmselect/prometheus/series_response.qtpl:16 } -//line app/vmselect/prometheus/series_response.qtpl:29 +//line app/vmselect/prometheus/series_response.qtpl:17 } -//line app/vmselect/prometheus/series_response.qtpl:29 +//line app/vmselect/prometheus/series_response.qtpl:17 qw422016.N().S(`]`) -//line app/vmselect/prometheus/series_response.qtpl:32 - qt.Printf("generate response: series=%d", seriesCount) +//line app/vmselect/prometheus/series_response.qtpl:20 + qt.Printf("generate response: series=%d", len(mns)) qtDone() -//line app/vmselect/prometheus/series_response.qtpl:35 +//line app/vmselect/prometheus/series_response.qtpl:23 streamdumpQueryTrace(qw422016, qt) -//line app/vmselect/prometheus/series_response.qtpl:35 +//line app/vmselect/prometheus/series_response.qtpl:23 qw422016.N().S(`}`) -//line app/vmselect/prometheus/series_response.qtpl:37 +//line app/vmselect/prometheus/series_response.qtpl:25 } -//line app/vmselect/prometheus/series_response.qtpl:37 -func WriteSeriesResponse(qq422016 qtio422016.Writer, isPartial bool, resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer, qtDone func()) { -//line app/vmselect/prometheus/series_response.qtpl:37 +//line app/vmselect/prometheus/series_response.qtpl:25 +func WriteSeriesResponse(qq422016 qtio422016.Writer, isPartial bool, mns []storage.MetricName, qt *querytracer.Tracer, qtDone func()) { +//line app/vmselect/prometheus/series_response.qtpl:25 qw422016 := qt422016.AcquireWriter(qq422016) -//line app/vmselect/prometheus/series_response.qtpl:37 - StreamSeriesResponse(qw422016, isPartial, resultsCh, qt, qtDone) -//line app/vmselect/prometheus/series_response.qtpl:37 +//line app/vmselect/prometheus/series_response.qtpl:25 + StreamSeriesResponse(qw422016, isPartial, mns, qt, qtDone) +//line app/vmselect/prometheus/series_response.qtpl:25 qt422016.ReleaseWriter(qw422016) -//line app/vmselect/prometheus/series_response.qtpl:37 +//line app/vmselect/prometheus/series_response.qtpl:25 } -//line app/vmselect/prometheus/series_response.qtpl:37 -func SeriesResponse(isPartial bool, resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer, qtDone func()) string { -//line app/vmselect/prometheus/series_response.qtpl:37 +//line app/vmselect/prometheus/series_response.qtpl:25 +func SeriesResponse(isPartial bool, mns []storage.MetricName, qt *querytracer.Tracer, qtDone func()) string { +//line app/vmselect/prometheus/series_response.qtpl:25 qb422016 := qt422016.AcquireByteBuffer() -//line app/vmselect/prometheus/series_response.qtpl:37 - WriteSeriesResponse(qb422016, isPartial, resultsCh, qt, qtDone) -//line app/vmselect/prometheus/series_response.qtpl:37 +//line app/vmselect/prometheus/series_response.qtpl:25 + WriteSeriesResponse(qb422016, isPartial, mns, qt, qtDone) +//line app/vmselect/prometheus/series_response.qtpl:25 qs422016 := string(qb422016.B) -//line app/vmselect/prometheus/series_response.qtpl:37 +//line app/vmselect/prometheus/series_response.qtpl:25 qt422016.ReleaseByteBuffer(qb422016) -//line app/vmselect/prometheus/series_response.qtpl:37 +//line app/vmselect/prometheus/series_response.qtpl:25 return qs422016 -//line app/vmselect/prometheus/series_response.qtpl:37 +//line app/vmselect/prometheus/series_response.qtpl:25 } diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 25c6b8528e..b111ad3681 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -922,7 +922,7 @@ func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcNa minTimestamp -= ec.Step } sq := storage.NewSearchQuery(ec.AuthToken.AccountID, ec.AuthToken.ProjectID, minTimestamp, ec.End, tfss, ec.MaxSeries) - rss, isPartial, err := netstorage.ProcessSearchQuery(qt, ec.AuthToken, ec.DenyPartialResponse, sq, true, ec.Deadline) + rss, isPartial, err := netstorage.ProcessSearchQuery(qt, ec.AuthToken, ec.DenyPartialResponse, sq, ec.Deadline) if err != nil { return nil, err } diff --git a/app/vmstorage/transport/vmselect.go b/app/vmstorage/transport/vmselect.go index 20bf711099..5ed34c8b4f 100644 --- a/app/vmstorage/transport/vmselect.go +++ b/app/vmstorage/transport/vmselect.go @@ -119,12 +119,12 @@ func getBlockIterator() *blockIterator { return v.(*blockIterator) } -func (bi *blockIterator) NextBlock(mb *storage.MetricBlock, fetchData bool) bool { +func (bi *blockIterator) NextBlock(mb *storage.MetricBlock) bool { if !bi.sr.NextMetricBlock() { return false } mb.MetricName = bi.sr.MetricBlockRef.MetricName - bi.sr.MetricBlockRef.BlockRef.MustReadBlock(&mb.Block, fetchData) + bi.sr.MetricBlockRef.BlockRef.MustReadBlock(&mb.Block) return true } diff --git a/lib/storage/part_search_test.go b/lib/storage/part_search_test.go index 6a34bded21..f07024c9e0 100644 --- a/lib/storage/part_search_test.go +++ b/lib/storage/part_search_test.go @@ -1251,7 +1251,7 @@ func testPartSearchSerial(p *part, tsids []TSID, tr TimeRange, expectedRawBlocks var bs []Block for ps.NextBlock() { var b Block - ps.BlockRef.MustReadBlock(&b, true) + ps.BlockRef.MustReadBlock(&b) bs = append(bs, b) } if err := ps.Error(); err != nil { diff --git a/lib/storage/partition_search_test.go b/lib/storage/partition_search_test.go index 2ea714e33d..630796d362 100644 --- a/lib/storage/partition_search_test.go +++ b/lib/storage/partition_search_test.go @@ -245,7 +245,7 @@ func testPartitionSearchSerial(pt *partition, tsids []TSID, tr TimeRange, rbsExp pts.Init(pt, tsids, tr) for pts.NextBlock() { var b Block - pts.BlockRef.MustReadBlock(&b, true) + pts.BlockRef.MustReadBlock(&b) bs = append(bs, b) } if err := pts.Error(); err != nil { diff --git a/lib/storage/search.go b/lib/storage/search.go index 11f9c301ad..c4ff994e7c 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -33,14 +33,9 @@ func (br *BlockRef) init(p *part, bh *blockHeader) { } // MustReadBlock reads block from br to dst. -// -// if fetchData is false, then only block header is read, otherwise all the data is read. -func (br *BlockRef) MustReadBlock(dst *Block, fetchData bool) { +func (br *BlockRef) MustReadBlock(dst *Block) { dst.Reset() dst.bh = br.bh - if !fetchData { - return - } dst.timestampsData = bytesutil.ResizeNoCopyMayOverallocate(dst.timestampsData, int(br.bh.TimestampsBlockSize)) br.p.timestampsFile.MustReadAt(dst.timestampsData, int64(br.bh.TimestampsBlockOffset)) diff --git a/lib/storage/search_test.go b/lib/storage/search_test.go index 714eb2aadf..23dbc9ac7c 100644 --- a/lib/storage/search_test.go +++ b/lib/storage/search_test.go @@ -213,7 +213,7 @@ func testSearchInternal(st *Storage, tr TimeRange, mrs []MetricRow, accountsCoun var mbs []metricBlock for s.NextMetricBlock() { var b Block - s.MetricBlockRef.BlockRef.MustReadBlock(&b, true) + s.MetricBlockRef.BlockRef.MustReadBlock(&b) var mb metricBlock mb.MetricName = append(mb.MetricName, s.MetricBlockRef.MetricName...) diff --git a/lib/storage/table_search_test.go b/lib/storage/table_search_test.go index bd0889df9d..fe30243dc9 100644 --- a/lib/storage/table_search_test.go +++ b/lib/storage/table_search_test.go @@ -255,7 +255,7 @@ func testTableSearchSerial(tb *table, tsids []TSID, tr TimeRange, rbsExpected [] ts.Init(tb, tsids, tr) for ts.NextBlock() { var b Block - ts.BlockRef.MustReadBlock(&b, true) + ts.BlockRef.MustReadBlock(&b) bs = append(bs, b) } if err := ts.Error(); err != nil { diff --git a/lib/storage/table_search_timing_test.go b/lib/storage/table_search_timing_test.go index 193ea1525d..3427650764 100644 --- a/lib/storage/table_search_timing_test.go +++ b/lib/storage/table_search_timing_test.go @@ -27,11 +27,7 @@ func BenchmarkTableSearch(b *testing.B) { b.Run(fmt.Sprintf("tsidsCount_%d", tsidsCount), func(b *testing.B) { for _, tsidsSearch := range []int{1, 1e1, 1e2, 1e3, 1e4} { b.Run(fmt.Sprintf("tsidsSearch_%d", tsidsSearch), func(b *testing.B) { - for _, fetchData := range []bool{true, false} { - b.Run(fmt.Sprintf("fetchData_%v", fetchData), func(b *testing.B) { - benchmarkTableSearch(b, rowsCount, tsidsCount, tsidsSearch, fetchData) - }) - } + benchmarkTableSearch(b, rowsCount, tsidsCount, tsidsSearch) }) } }) @@ -110,7 +106,7 @@ func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerIn tb.MustClose() } -func benchmarkTableSearch(b *testing.B, rowsCount, tsidsCount, tsidsSearch int, fetchData bool) { +func benchmarkTableSearch(b *testing.B, rowsCount, tsidsCount, tsidsSearch int) { startTimestamp := timestampFromTime(time.Now()) - 365*24*3600*1000 rowsPerInsert := getMaxRawRowsPerShard() @@ -137,7 +133,7 @@ func benchmarkTableSearch(b *testing.B, rowsCount, tsidsCount, tsidsSearch int, } ts.Init(tb, tsids, tr) for ts.NextBlock() { - ts.BlockRef.MustReadBlock(&tmpBlock, fetchData) + ts.BlockRef.MustReadBlock(&tmpBlock) } ts.MustClose() } diff --git a/lib/vmselectapi/api.go b/lib/vmselectapi/api.go index 6fd8f498b2..2d77acc4b8 100644 --- a/lib/vmselectapi/api.go +++ b/lib/vmselectapi/api.go @@ -47,9 +47,7 @@ type BlockIterator interface { // NextBlock reads the next block into mb. // // It returns true on success, false on error or if no blocks to read. - // - // If fetchData is false, then only mb.MetricName is read. Otherwise mb.Block is also read. - NextBlock(mb *storage.MetricBlock, fetchData bool) bool + NextBlock(mb *storage.MetricBlock) bool // MustClose frees up resources allocated by BlockIterator. MustClose() diff --git a/lib/vmselectapi/server.go b/lib/vmselectapi/server.go index c491e7da66..4c5d3b1845 100644 --- a/lib/vmselectapi/server.go +++ b/lib/vmselectapi/server.go @@ -474,7 +474,7 @@ func (s *Server) processRequest(ctx *vmselectRequestCtx) error { func (s *Server) processRPC(ctx *vmselectRequestCtx, rpcName string) error { switch rpcName { - case "search_v6": + case "search_v7": return s.processSeriesSearch(ctx) case "searchMetricNames_v3": return s.processSearchMetricNames(ctx) @@ -892,10 +892,6 @@ func (s *Server) processSeriesSearch(ctx *vmselectRequestCtx) error { if err := ctx.readSearchQuery(); err != nil { return err } - fetchData, err := ctx.readBool() - if err != nil { - return fmt.Errorf("cannot read `fetchData` bool: %w", err) - } // Initiaialize the search. startTime := time.Now() @@ -922,7 +918,7 @@ func (s *Server) processSeriesSearch(ctx *vmselectRequestCtx) error { // Send found blocks to vmselect. blocksRead := 0 - for bi.NextBlock(&ctx.mb, fetchData) { + for bi.NextBlock(&ctx.mb) { blocksRead++ s.metricBlocksRead.Inc() s.metricRowsRead.Add(ctx.mb.Block.RowsCount())