app/vmselect/netstorage: substitute sorting packedTimeseries with the natural order of the fetched blocks

This should minimize the number of disk seeks when reading data from temporary file.
This commit is contained in:
Aliaksandr Valialkin 2020-04-26 16:25:35 +03:00
parent 6954d0edb7
commit fcf57f9883

View file

@ -503,6 +503,7 @@ func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline Deadli
tbf := getTmpBlocksFile() tbf := getTmpBlocksFile()
m := make(map[string][]tmpBlockAddr) m := make(map[string][]tmpBlockAddr)
var orderedMetricNames []string
blocksRead := 0 blocksRead := 0
bb := tmpBufPool.Get() bb := tmpBufPool.Get()
defer tmpBufPool.Put(bb) defer tmpBufPool.Put(bb)
@ -519,7 +520,11 @@ func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline Deadli
return nil, fmt.Errorf("timeout exceeded while fetching data block #%d from storage: %s", blocksRead, deadline.String()) return nil, fmt.Errorf("timeout exceeded while fetching data block #%d from storage: %s", blocksRead, deadline.String())
} }
metricName := sr.MetricBlock.MetricName metricName := sr.MetricBlock.MetricName
m[string(metricName)] = append(m[string(metricName)], addr) addrs := m[string(metricName)]
if len(addrs) == 0 {
orderedMetricNames = append(orderedMetricNames, string(metricName))
}
m[string(metricName)] = append(addrs, addr)
} }
if err := sr.Error(); err != nil { if err := sr.Error(); err != nil {
putTmpBlocksFile(tbf) putTmpBlocksFile(tbf)
@ -531,27 +536,18 @@ func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline Deadli
} }
var rss Results var rss Results
rss.packedTimeseries = make([]packedTimeseries, len(m))
rss.tr = tr rss.tr = tr
rss.fetchData = fetchData rss.fetchData = fetchData
rss.deadline = deadline rss.deadline = deadline
rss.tbf = tbf rss.tbf = tbf
i := 0 pts := make([]packedTimeseries, len(orderedMetricNames))
for metricName, addrs := range m { for i, metricName := range orderedMetricNames {
pts := &rss.packedTimeseries[i] pts[i] = packedTimeseries{
i++ metricName: metricName,
pts.metricName = metricName addrs: m[metricName],
pts.addrs = addrs }
} }
rss.packedTimeseries = pts
// Sort rss.packedTimeseries by the first addr offset in order
// to reduce the number of disk seeks during unpacking in RunParallel.
// In this case tmpBlocksFile must be read almost sequentially.
sort.Slice(rss.packedTimeseries, func(i, j int) bool {
pts := rss.packedTimeseries
return pts[i].addrs[0].offset < pts[j].addrs[0].offset
})
return &rss, nil return &rss, nil
} }