app/vmselect/netstorage: reduce memory usage when fetching big number of data blocks from vmstorage

Dump data blocks directly to temporary file instead of buffering them in RAM
This commit is contained in:
Aliaksandr Valialkin 2019-09-28 12:20:50 +03:00
parent ba460f62e6
commit 56dff57f77

View file

@ -694,64 +694,67 @@ func GetSeriesCount(at *auth.Token, deadline Deadline) (uint64, bool, error) {
return n, isPartialResult, nil
}
type tmpBlocksFileWrapper struct {
mu sync.Mutex
tbf *tmpBlocksFile
m map[string][]tmpBlockAddr
}
func (tbfw *tmpBlocksFileWrapper) WriteBlock(mb *storage.MetricBlock) error {
tbfw.mu.Lock()
defer tbfw.mu.Unlock()
addr, err := tbfw.tbf.WriteBlock(mb.Block)
if err != nil {
return err
}
metricName := mb.MetricName
tbfw.m[string(metricName)] = append(tbfw.m[string(metricName)], addr)
return nil
}
// ProcessSearchQuery performs sq on storage nodes until the given deadline.
func ProcessSearchQuery(at *auth.Token, sq *storage.SearchQuery, fetchData bool, deadline Deadline) (*Results, bool, error) {
requestData := sq.Marshal(nil)
// Send the query to all the storage nodes in parallel.
type nodeResult struct {
results []*storage.MetricBlock
err error
}
resultsCh := make(chan nodeResult, len(storageNodes))
resultsCh := make(chan error, len(storageNodes))
tr := storage.TimeRange{
MinTimestamp: sq.MinTimestamp,
MaxTimestamp: sq.MaxTimestamp,
}
tbfw := &tmpBlocksFileWrapper{
tbf: getTmpBlocksFile(),
m: make(map[string][]tmpBlockAddr),
}
for _, sn := range storageNodes {
go func(sn *storageNode) {
sn.searchRequests.Inc()
results, err := sn.processSearchQuery(requestData, tr, fetchData, deadline)
err := sn.processSearchQuery(tbfw, requestData, tr, fetchData, deadline)
if err != nil {
sn.searchRequestErrors.Inc()
err = fmt.Errorf("cannot perform search on vmstorage %s: %s", sn.connPool.Addr(), err)
}
resultsCh <- nodeResult{
results: results,
err: err,
}
resultsCh <- err
}(sn)
}
// Collect results.
var errors []error
tbf := getTmpBlocksFile()
m := make(map[string][]tmpBlockAddr)
blocksRead := 0
for i := 0; i < len(storageNodes); i++ {
// There is no need in timer here, since all the goroutines executing
// sn.processSearchQuery must be finished until the deadline.
nr := <-resultsCh
if nr.err != nil {
errors = append(errors, nr.err)
err := <-resultsCh
if err != nil {
errors = append(errors, err)
continue
}
for _, mb := range nr.results {
blocksRead++
addr, err := tbf.WriteBlock(mb.Block)
if err != nil {
errors = append(errors, fmt.Errorf("cannot write data block #%d to temporary blocks file: %s", blocksRead, err))
break
}
metricName := mb.MetricName
m[string(metricName)] = append(m[string(metricName)], addr)
}
}
isPartialResult := false
if len(errors) > 0 {
if len(m) == 0 {
if len(tbfw.m) == 0 {
// Return only the first error, since it has no sense in returning all errors.
putTmpBlocksFile(tbf)
putTmpBlocksFile(tbfw.tbf)
return nil, true, fmt.Errorf("error occured during search: %s", errors[0])
}
@ -763,20 +766,20 @@ func ProcessSearchQuery(at *auth.Token, sq *storage.SearchQuery, fetchData bool,
partialSearchResults.Inc()
isPartialResult = true
}
if err := tbf.Finalize(); err != nil {
putTmpBlocksFile(tbf)
return nil, false, fmt.Errorf("cannot finalize temporary blocks file with %d blocks: %s", blocksRead, err)
if err := tbfw.tbf.Finalize(); err != nil {
putTmpBlocksFile(tbfw.tbf)
return nil, false, fmt.Errorf("cannot finalize temporary blocks file with %d time series: %s", len(tbfw.m), err)
}
var rss Results
rss.packedTimeseries = make([]packedTimeseries, len(m))
rss.packedTimeseries = make([]packedTimeseries, len(tbfw.m))
rss.at = at
rss.tr = tr
rss.fetchData = fetchData
rss.deadline = deadline
rss.tbf = tbf
rss.tbf = tbfw.tbf
i := 0
for metricName, addrs := range m {
for metricName, addrs := range tbfw.m {
pts := &rss.packedTimeseries[i]
i++
pts.metricName = metricName
@ -935,24 +938,23 @@ func (sn *storageNode) getSeriesCount(accountID, projectID uint32, deadline Dead
return n, nil
}
func (sn *storageNode) processSearchQuery(requestData []byte, tr storage.TimeRange, fetchData bool, deadline Deadline) ([]*storage.MetricBlock, error) {
var results []*storage.MetricBlock
func (sn *storageNode) processSearchQuery(tbfw *tmpBlocksFileWrapper, requestData []byte, tr storage.TimeRange, fetchData bool, deadline Deadline) error {
var blocksRead int
f := func(bc *handshake.BufferedConn) error {
rs, err := sn.processSearchQueryOnConn(bc, requestData, tr, fetchData)
n, err := sn.processSearchQueryOnConn(tbfw, bc, requestData, tr, fetchData)
if err != nil {
return err
}
results = rs
blocksRead = n
return nil
}
if err := sn.execOnConn("search_v3", f, deadline); err != nil {
// Try again before giving up.
results = nil
if err := sn.execOnConn("search_v3", f, deadline); err != nil && blocksRead == 0 {
// Try again before giving up if zero blocks read on the previous attempt.
if err = sn.execOnConn("search_v3", f, deadline); err != nil {
return nil, err
return err
}
}
return results, nil
return nil
}
func (sn *storageNode) execOnConn(rpcName string, f func(bc *handshake.BufferedConn) error, deadline Deadline) error {
@ -1201,16 +1203,16 @@ const maxMetricBlockSize = 1024 * 1024
// from vmstorage.
const maxErrorMessageSize = 64 * 1024
func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requestData []byte, tr storage.TimeRange, fetchData bool) ([]*storage.MetricBlock, error) {
func (sn *storageNode) processSearchQueryOnConn(tbfw *tmpBlocksFileWrapper, bc *handshake.BufferedConn, requestData []byte, tr storage.TimeRange, fetchData bool) (int, error) {
// Send the request to sn.
if err := writeBytes(bc, requestData); err != nil {
return nil, fmt.Errorf("cannot write requestData: %s", err)
return 0, fmt.Errorf("cannot write requestData: %s", err)
}
if err := writeBool(bc, fetchData); err != nil {
return nil, fmt.Errorf("cannot write fetchData=%v: %s", fetchData, err)
return 0, fmt.Errorf("cannot write fetchData=%v: %s", fetchData, err)
}
if err := bc.Flush(); err != nil {
return nil, fmt.Errorf("cannot flush requestData to conn: %s", err)
return 0, fmt.Errorf("cannot flush requestData to conn: %s", err)
}
var err error
@ -1219,37 +1221,38 @@ func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requ
// Read response error.
buf, err = readBytes(buf[:0], bc, maxErrorMessageSize)
if err != nil {
return nil, fmt.Errorf("cannot read error message: %s", err)
return 0, fmt.Errorf("cannot read error message: %s", err)
}
if len(buf) > 0 {
return nil, &errRemote{msg: string(buf)}
return 0, &errRemote{msg: string(buf)}
}
// Read response. It may consist of multiple MetricBlocks.
var results []*storage.MetricBlock
metricBlocksRead := 0
blocksRead := 0
for {
buf, err = readBytes(buf[:0], bc, maxMetricBlockSize)
if err != nil {
return nil, fmt.Errorf("cannot read MetricBlock #%d: %s", metricBlocksRead, err)
return blocksRead, fmt.Errorf("cannot read MetricBlock #%d: %s", blocksRead, err)
}
if len(buf) == 0 {
// Reached the end of the response
return results, nil
return blocksRead, nil
}
var mb storage.MetricBlock
mb.Block = &storage.Block{}
tail, err := mb.Unmarshal(buf)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal MetricBlock: %s", err)
return blocksRead, fmt.Errorf("cannot unmarshal MetricBlock #%d: %s", blocksRead, err)
}
if len(tail) != 0 {
return nil, fmt.Errorf("non-empty tail after unmarshaling MetricBlock: (len=%d) %q", len(tail), tail)
return blocksRead, fmt.Errorf("non-empty tail after unmarshaling MetricBlock #%d: (len=%d) %q", blocksRead, len(tail), tail)
}
metricBlocksRead++
blocksRead++
sn.metricBlocksRead.Inc()
sn.metricRowsRead.Add(mb.Block.RowsCount())
results = append(results, &mb)
if err := tbfw.WriteBlock(&mb); err != nil {
return blocksRead, fmt.Errorf("cannot write MetricBlock #%d to temporary blocks file: %s", blocksRead, err)
}
}
}