app/vmselect: optimize /api/v1/series by skipping storage data

Fetch and process only time series metainfo.
This commit is contained in:
Aliaksandr Valialkin 2019-08-04 22:15:33 +03:00
parent 7f5afae1e3
commit 880b1d80b1
14 changed files with 120 additions and 44 deletions

View file

@ -45,9 +45,10 @@ func (r *Result) reset() {
// Results holds results returned from ProcessSearchQuery.
type Results struct {
at *auth.Token
tr storage.TimeRange
deadline Deadline
at *auth.Token
tr storage.TimeRange
fetchData bool
deadline Deadline
tbf *tmpBlocksFile
@ -100,10 +101,10 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error {
err = fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.Timeout)
break
}
if err = pts.Unpack(rss.tbf, rs, rss.tr, rss.at, maxWorkersCount); err != nil {
if err = pts.Unpack(rss.tbf, rs, rss.tr, rss.fetchData, rss.at, maxWorkersCount); err != nil {
break
}
if len(rs.Timestamps) == 0 {
if len(rs.Timestamps) == 0 && rss.fetchData {
// Skip empty blocks.
continue
}
@ -146,7 +147,7 @@ type packedTimeseries struct {
}
// Unpack unpacks pts to dst.
func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.TimeRange, at *auth.Token, maxWorkersCount int) error {
func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.TimeRange, fetchData bool, at *auth.Token, maxWorkersCount int) error {
dst.reset()
if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil {
@ -173,7 +174,7 @@ func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.
var err error
for addr := range workCh {
sb := getSortBlock()
if err = sb.unpackFrom(tbf, addr, tr, at); err != nil {
if err = sb.unpackFrom(tbf, addr, tr, fetchData, at); err != nil {
break
}
@ -292,10 +293,12 @@ func (sb *sortBlock) reset() {
sb.NextIdx = 0
}
func (sb *sortBlock) unpackFrom(tbf *tmpBlocksFile, addr tmpBlockAddr, tr storage.TimeRange, at *auth.Token) error {
func (sb *sortBlock) unpackFrom(tbf *tmpBlocksFile, addr tmpBlockAddr, tr storage.TimeRange, fetchData bool, at *auth.Token) error {
tbf.MustReadBlockAt(&sb.b, addr)
if err := sb.b.UnmarshalData(); err != nil {
return fmt.Errorf("cannot unmarshal block: %s", err)
if fetchData {
if err := sb.b.UnmarshalData(); err != nil {
return fmt.Errorf("cannot unmarshal block: %s", err)
}
}
timestamps := sb.b.Timestamps()
@ -692,7 +695,7 @@ func GetSeriesCount(at *auth.Token, deadline Deadline) (uint64, bool, error) {
}
// ProcessSearchQuery performs sq on storage nodes until the given deadline.
func ProcessSearchQuery(at *auth.Token, sq *storage.SearchQuery, deadline Deadline) (*Results, bool, error) {
func ProcessSearchQuery(at *auth.Token, sq *storage.SearchQuery, fetchData bool, deadline Deadline) (*Results, bool, error) {
requestData := sq.Marshal(nil)
// Send the query to all the storage nodes in parallel.
@ -708,7 +711,7 @@ func ProcessSearchQuery(at *auth.Token, sq *storage.SearchQuery, deadline Deadli
for _, sn := range storageNodes {
go func(sn *storageNode) {
sn.searchRequests.Inc()
results, err := sn.processSearchQuery(requestData, tr, deadline)
results, err := sn.processSearchQuery(requestData, tr, fetchData, deadline)
if err != nil {
sn.searchRequestErrors.Inc()
err = fmt.Errorf("cannot perform search on vmstorage %s: %s", sn.connPool.Addr(), err)
@ -769,6 +772,7 @@ func ProcessSearchQuery(at *auth.Token, sq *storage.SearchQuery, deadline Deadli
rss.packedTimeseries = make([]packedTimeseries, len(m))
rss.at = at
rss.tr = tr
rss.fetchData = fetchData
rss.deadline = deadline
rss.tbf = tbf
i := 0
@ -931,20 +935,20 @@ func (sn *storageNode) getSeriesCount(accountID, projectID uint32, deadline Dead
return n, nil
}
func (sn *storageNode) processSearchQuery(requestData []byte, tr storage.TimeRange, deadline Deadline) ([]*storage.MetricBlock, error) {
func (sn *storageNode) processSearchQuery(requestData []byte, tr storage.TimeRange, fetchData bool, deadline Deadline) ([]*storage.MetricBlock, error) {
var results []*storage.MetricBlock
f := func(bc *handshake.BufferedConn) error {
rs, err := sn.processSearchQueryOnConn(bc, requestData, tr)
rs, err := sn.processSearchQueryOnConn(bc, requestData, tr, fetchData)
if err != nil {
return err
}
results = rs
return nil
}
if err := sn.execOnConn("search_v2", f, deadline); err != nil {
if err := sn.execOnConn("search_v3", f, deadline); err != nil {
// Try again before giving up.
results = nil
if err = sn.execOnConn("search_v2", f, deadline); err != nil {
if err = sn.execOnConn("search_v3", f, deadline); err != nil {
return nil, err
}
}
@ -1197,11 +1201,14 @@ const maxMetricBlockSize = 1024 * 1024
// from vmstorage.
const maxErrorMessageSize = 64 * 1024
func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requestData []byte, tr storage.TimeRange) ([]*storage.MetricBlock, error) {
func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requestData []byte, tr storage.TimeRange, fetchData bool) ([]*storage.MetricBlock, error) {
// Send the request to sn.
if err := writeBytes(bc, requestData); err != nil {
return nil, fmt.Errorf("cannot write requestData: %s", err)
}
if err := writeBool(bc, fetchData); err != nil {
return nil, fmt.Errorf("cannot write fetchData=%v: %s", fetchData, err)
}
if err := bc.Flush(); err != nil {
return nil, fmt.Errorf("cannot flush requestData to conn: %s", err)
}
@ -1265,6 +1272,17 @@ func writeUint32(bc *handshake.BufferedConn, n uint32) error {
return nil
}
func writeBool(bc *handshake.BufferedConn, b bool) error {
var buf [1]byte
if b {
buf[0] = 1
}
if _, err := bc.Write(buf[:]); err != nil {
return err
}
return nil
}
func readBytes(buf []byte, bc *handshake.BufferedConn, maxDataSize int) ([]byte, error) {
buf = bytesutil.Resize(buf, 8)
if _, err := io.ReadFull(bc, buf); err != nil {

View file

@ -72,7 +72,7 @@ func FederateHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) err
MaxTimestamp: end,
TagFilterss: tagFilterss,
}
rss, isPartial, err := netstorage.ProcessSearchQuery(at, sq, deadline)
rss, isPartial, err := netstorage.ProcessSearchQuery(at, sq, true, deadline)
if err != nil {
return fmt.Errorf("cannot fetch data for %q: %s", sq, err)
}
@ -169,7 +169,7 @@ func exportHandler(at *auth.Token, w http.ResponseWriter, matches []string, star
MaxTimestamp: end,
TagFilterss: tagFilterss,
}
rss, isPartial, err := netstorage.ProcessSearchQuery(at, sq, deadline)
rss, isPartial, err := netstorage.ProcessSearchQuery(at, sq, true, deadline)
if err != nil {
return fmt.Errorf("cannot fetch data for %q: %s", sq, err)
}
@ -405,7 +405,7 @@ func SeriesHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error
MaxTimestamp: end,
TagFilterss: tagFilterss,
}
rss, isPartial, err := netstorage.ProcessSearchQuery(at, sq, deadline)
rss, isPartial, err := netstorage.ProcessSearchQuery(at, sq, false, deadline)
if err != nil {
return fmt.Errorf("cannot fetch data for %q: %s", sq, err)
}

View file

@ -578,8 +578,7 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me
MaxTimestamp: ec.End + ec.Step,
TagFilterss: [][]storage.TagFilter{me.TagFilters},
}
rss, isPartial, err := netstorage.ProcessSearchQuery(ec.AuthToken, sq, ec.Deadline)
rss, isPartial, err := netstorage.ProcessSearchQuery(ec.AuthToken, sq, true, ec.Deadline)
if err != nil {
return nil, err
}

View file

@ -392,6 +392,18 @@ func (ctx *vmselectRequestCtx) readDataBufBytes(maxDataSize int) error {
return nil
}
func (ctx *vmselectRequestCtx) readBool() (bool, error) {
ctx.dataBuf = bytesutil.Resize(ctx.dataBuf, 1)
if _, err := io.ReadFull(ctx.bc, ctx.dataBuf); err != nil {
if err == io.EOF {
return false, err
}
return false, fmt.Errorf("cannot read bool: %s", err)
}
v := ctx.dataBuf[0] != 0
return v, nil
}
func (ctx *vmselectRequestCtx) writeDataBufBytes() error {
if err := ctx.writeUint64(uint64(len(ctx.dataBuf))); err != nil {
return fmt.Errorf("cannot write data size: %s", err)
@ -443,7 +455,7 @@ func (s *Server) processVMSelectRequest(ctx *vmselectRequestCtx) error {
}()
switch string(ctx.dataBuf) {
case "search_v2":
case "search_v3":
return s.processVMSelectSearchQuery(ctx)
case "labelValues":
return s.processVMSelectLabelValues(ctx)
@ -714,6 +726,10 @@ func (s *Server) processVMSelectSearchQuery(ctx *vmselectRequestCtx) error {
if len(tail) > 0 {
return fmt.Errorf("unexpected non-zero tail left after unmarshaling SearchQuery: (len=%d) %q", len(tail), tail)
}
fetchData, err := ctx.readBool()
if err != nil {
return fmt.Errorf("cannot read `fetchData` bool: %s", err)
}
// Setup search.
if err := ctx.setupTfss(); err != nil {
@ -728,7 +744,7 @@ func (s *Server) processVMSelectSearchQuery(ctx *vmselectRequestCtx) error {
MinTimestamp: ctx.sq.MinTimestamp,
MaxTimestamp: ctx.sq.MaxTimestamp,
}
ctx.sr.Init(s.storage, ctx.tfss, tr, *maxMetricsPerSearch)
ctx.sr.Init(s.storage, ctx.tfss, tr, fetchData, *maxMetricsPerSearch)
defer ctx.sr.MustClose()
if err := ctx.sr.Error(); err != nil {
// Send the error message to vmselect.

View file

@ -28,6 +28,9 @@ type partSearch struct {
// tr is a time range to search.
tr TimeRange
// Skip populating timestampsData and valuesData in Block if fetchData=false.
fetchData bool
metaindex []metaindexRow
ibCache *indexBlockCache
@ -61,7 +64,7 @@ func (ps *partSearch) reset() {
}
// Init initializes the ps with the given p, tsids and tr.
func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange) {
func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange, fetchData bool) {
ps.reset()
ps.p = p
@ -72,6 +75,7 @@ func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange) {
ps.tsids = append(ps.tsids[:0], tsids...)
}
ps.tr = tr
ps.fetchData = fetchData
ps.metaindex = p.metaindex
ps.ibCache = &p.ibCache
@ -281,11 +285,14 @@ func (ps *partSearch) searchBHS() bool {
func (ps *partSearch) readBlock(bh *blockHeader) {
ps.Block.Reset()
ps.Block.bh = *bh
if !ps.fetchData {
return
}
ps.Block.timestampsData = bytesutil.Resize(ps.Block.timestampsData[:0], int(bh.TimestampsBlockSize))
ps.p.timestampsFile.ReadAt(ps.Block.timestampsData, int64(bh.TimestampsBlockOffset))
ps.Block.valuesData = bytesutil.Resize(ps.Block.valuesData[:0], int(bh.ValuesBlockSize))
ps.p.valuesFile.ReadAt(ps.Block.valuesData, int64(bh.ValuesBlockOffset))
ps.Block.bh = *bh
}

View file

@ -1247,7 +1247,7 @@ func testPartSearch(t *testing.T, p *part, tsids []TSID, tr TimeRange, expectedR
func testPartSearchSerial(p *part, tsids []TSID, tr TimeRange, expectedRawBlocks []rawBlock) error {
var ps partSearch
ps.Init(p, tsids, tr)
ps.Init(p, tsids, tr, true)
var bs []Block
for ps.NextBlock() {
var b Block

View file

@ -56,7 +56,7 @@ func (pts *partitionSearch) reset() {
// Init initializes the search in the given partition for the given tsid and tr.
//
// MustClose must be called when partition search is done.
func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange) {
func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange, fetchData bool) {
if pts.needClosing {
logger.Panicf("BUG: missing partitionSearch.MustClose call before the next call to Init")
}
@ -85,7 +85,7 @@ func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange) {
}
pts.psPool = pts.psPool[:len(pts.pws)]
for i, pw := range pts.pws {
pts.psPool[i].Init(pw.p, tsids, tr)
pts.psPool[i].Init(pw.p, tsids, tr, fetchData)
}
// Initialize the psHeap.

View file

@ -238,7 +238,7 @@ func testPartitionSearchSerial(pt *partition, tsids []TSID, tr TimeRange, rbsExp
bs := []Block{}
var pts partitionSearch
pts.Init(pt, tsids, tr)
pts.Init(pt, tsids, tr, true)
for pts.NextBlock() {
var b Block
b.CopyFrom(pts.Block)
@ -263,7 +263,7 @@ func testPartitionSearchSerial(pt *partition, tsids []TSID, tr TimeRange, rbsExp
}
// verify that empty tsids returns empty result
pts.Init(pt, []TSID{}, tr)
pts.Init(pt, []TSID{}, tr, true)
if pts.NextBlock() {
return fmt.Errorf("unexpected block got for an empty tsids list: %+v", pts.Block)
}
@ -271,6 +271,16 @@ func testPartitionSearchSerial(pt *partition, tsids []TSID, tr TimeRange, rbsExp
return fmt.Errorf("unexpected error on empty tsids list: %s", err)
}
pts.MustClose()
pts.Init(pt, []TSID{}, tr, false)
if pts.NextBlock() {
return fmt.Errorf("unexpected block got for an empty tsids list: %+v", pts.Block)
}
if err := pts.Error(); err != nil {
return fmt.Errorf("unexpected error on empty tsids list: %s", err)
}
pts.MustClose()
return nil
}

View file

@ -110,7 +110,7 @@ func (s *Search) reset() {
// Init initializes s from the given storage, tfss and tr.
//
// MustClose must be called when the search is done.
func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, maxMetrics int) {
func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, fetchData bool, maxMetrics int) {
if s.needClosing {
logger.Panicf("BUG: missing MustClose call before the next call to Init")
}
@ -123,7 +123,7 @@ func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, maxMet
// It is ok to call Init on error from storage.searchTSIDs.
// Init must be called before returning because it will fail
// on Seach.MustClose otherwise.
s.ts.Init(storage.tb, tsids, tr)
s.ts.Init(storage.tb, tsids, tr, fetchData)
if err != nil {
s.err = err

View file

@ -200,7 +200,7 @@ func testSearch(st *Storage, tr TimeRange, mrs []MetricRow, accountsCount int) e
}
// Search
s.Init(st, []*TagFilters{tfs}, tr, 1e5)
s.Init(st, []*TagFilters{tfs}, tr, true, 1e5)
var mbs []MetricBlock
for s.NextMetricBlock() {
var b Block

View file

@ -506,12 +506,24 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error {
MaxTimestamp: 2e10,
}
metricBlocksCount := func(tfs *TagFilters) int {
// Verify the number of blocks with fetchData=true
n := 0
sr.Init(s, []*TagFilters{tfs}, tr, 1e5)
sr.Init(s, []*TagFilters{tfs}, tr, true, 1e5)
for sr.NextMetricBlock() {
n++
}
sr.MustClose()
// Make sure the number of blocks with fetchData=false is the same.
m := 0
sr.Init(s, []*TagFilters{tfs}, tr, false, 1e5)
for sr.NextMetricBlock() {
m++
}
sr.MustClose()
if n != m {
return -1
}
return n
}
for i := 0; i < metricsCount; i++ {

View file

@ -55,7 +55,7 @@ func (ts *tableSearch) reset() {
// Init initializes the ts.
//
// MustClose must be called then the tableSearch is done.
func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange) {
func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange, fetchData bool) {
if ts.needClosing {
logger.Panicf("BUG: missing MustClose call before the next call to Init")
}
@ -86,7 +86,7 @@ func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange) {
}
ts.ptsPool = ts.ptsPool[:len(ts.ptws)]
for i, ptw := range ts.ptws {
ts.ptsPool[i].Init(ptw.pt, tsids, tr)
ts.ptsPool[i].Init(ptw.pt, tsids, tr, fetchData)
}
// Initialize the ptsHeap.

View file

@ -251,7 +251,7 @@ func testTableSearchSerial(tb *table, tsids []TSID, tr TimeRange, rbsExpected []
bs := []Block{}
var ts tableSearch
ts.Init(tb, tsids, tr)
ts.Init(tb, tsids, tr, true)
for ts.NextBlock() {
var b Block
b.CopyFrom(ts.Block)
@ -276,7 +276,7 @@ func testTableSearchSerial(tb *table, tsids []TSID, tr TimeRange, rbsExpected []
}
// verify that empty tsids returns empty result
ts.Init(tb, []TSID{}, tr)
ts.Init(tb, []TSID{}, tr, true)
if ts.NextBlock() {
return fmt.Errorf("unexpected block got for an empty tsids list: %+v", ts.Block)
}
@ -284,5 +284,15 @@ func testTableSearchSerial(tb *table, tsids []TSID, tr TimeRange, rbsExpected []
return fmt.Errorf("unexpected error on empty tsids list: %s", err)
}
ts.MustClose()
ts.Init(tb, []TSID{}, tr, false)
if ts.NextBlock() {
return fmt.Errorf("unexpected block got for an empty tsids list with fetchData=false: %+v", ts.Block)
}
if err := ts.Error(); err != nil {
return fmt.Errorf("unexpected error on empty tsids list with fetchData=false: %s", err)
}
ts.MustClose()
return nil
}

View file

@ -26,7 +26,11 @@ func BenchmarkTableSearch(b *testing.B) {
b.Run(fmt.Sprintf("tsidsCount_%d", tsidsCount), func(b *testing.B) {
for _, tsidsSearch := range []int{1, 1e1, 1e2, 1e3, 1e4} {
b.Run(fmt.Sprintf("tsidsSearch_%d", tsidsSearch), func(b *testing.B) {
benchmarkTableSearch(b, rowsCount, tsidsCount, tsidsSearch)
for _, fetchData := range []bool{true, false} {
b.Run(fmt.Sprintf("fetchData_%v", fetchData), func(b *testing.B) {
benchmarkTableSearch(b, rowsCount, tsidsCount, tsidsSearch, fetchData)
})
}
})
}
})
@ -103,9 +107,9 @@ func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerIn
tb.MustClose()
}
func benchmarkTableSearch(b *testing.B, rowsCount, tsidsCount, tsidsSearch int) {
func benchmarkTableSearch(b *testing.B, rowsCount, tsidsCount, tsidsSearch int, fetchData bool) {
startTimestamp := timestampFromTime(time.Now()) - 365*24*3600*1000
rowsPerInsert := int(maxRawRowsPerPartition)
rowsPerInsert := getMaxRawRowsPerPartition()
tb := openBenchTable(b, startTimestamp, rowsPerInsert, rowsCount, tsidsCount)
tr := TimeRange{
@ -127,7 +131,7 @@ func benchmarkTableSearch(b *testing.B, rowsCount, tsidsCount, tsidsSearch int)
for i := range tsids {
tsids[i].MetricID = 1 + uint64(i)
}
ts.Init(tb, tsids, tr)
ts.Init(tb, tsids, tr, fetchData)
for ts.NextBlock() {
}
ts.MustClose()