mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmselect/netstorage: cleanup after 4805b80977
This commit is contained in:
parent
4805b80977
commit
1b90a091cf
1 changed files with 12 additions and 13 deletions
|
@ -1705,8 +1705,7 @@ func (sn *storageNode) processSearchMetricNames(requestData []byte, deadline sea
|
||||||
|
|
||||||
func (sn *storageNode) processSearchQuery(requestData []byte, fetchData bool, processBlock func(mb *storage.MetricBlock) error, deadline searchutils.Deadline) error {
|
func (sn *storageNode) processSearchQuery(requestData []byte, fetchData bool, processBlock func(mb *storage.MetricBlock) error, deadline searchutils.Deadline) error {
|
||||||
f := func(bc *handshake.BufferedConn) error {
|
f := func(bc *handshake.BufferedConn) error {
|
||||||
_, err := sn.processSearchQueryOnConn(bc, requestData, fetchData, processBlock)
|
if err := sn.processSearchQueryOnConn(bc, requestData, fetchData, processBlock); err != nil {
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -2250,25 +2249,25 @@ func (sn *storageNode) processSearchMetricNamesOnConn(bc *handshake.BufferedConn
|
||||||
|
|
||||||
const maxMetricNameSize = 64 * 1024
|
const maxMetricNameSize = 64 * 1024
|
||||||
|
|
||||||
func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requestData []byte, fetchData bool, processBlock func(mb *storage.MetricBlock) error) (int, error) {
|
func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requestData []byte, fetchData bool, processBlock func(mb *storage.MetricBlock) error) error {
|
||||||
// Send the request to sn.
|
// Send the request to sn.
|
||||||
if err := writeBytes(bc, requestData); err != nil {
|
if err := writeBytes(bc, requestData); err != nil {
|
||||||
return 0, fmt.Errorf("cannot write requestData: %w", err)
|
return fmt.Errorf("cannot write requestData: %w", err)
|
||||||
}
|
}
|
||||||
if err := writeBool(bc, fetchData); err != nil {
|
if err := writeBool(bc, fetchData); err != nil {
|
||||||
return 0, fmt.Errorf("cannot write fetchData=%v: %w", fetchData, err)
|
return fmt.Errorf("cannot write fetchData=%v: %w", fetchData, err)
|
||||||
}
|
}
|
||||||
if err := bc.Flush(); err != nil {
|
if err := bc.Flush(); err != nil {
|
||||||
return 0, fmt.Errorf("cannot flush requestData to conn: %w", err)
|
return fmt.Errorf("cannot flush requestData to conn: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read response error.
|
// Read response error.
|
||||||
buf, err := readBytes(nil, bc, maxErrorMessageSize)
|
buf, err := readBytes(nil, bc, maxErrorMessageSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("cannot read error message: %w", err)
|
return fmt.Errorf("cannot read error message: %w", err)
|
||||||
}
|
}
|
||||||
if len(buf) > 0 {
|
if len(buf) > 0 {
|
||||||
return 0, newErrRemote(buf)
|
return newErrRemote(buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read response. It may consist of multiple MetricBlocks.
|
// Read response. It may consist of multiple MetricBlocks.
|
||||||
|
@ -2277,24 +2276,24 @@ func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requ
|
||||||
for {
|
for {
|
||||||
buf, err = readBytes(buf[:0], bc, maxMetricBlockSize)
|
buf, err = readBytes(buf[:0], bc, maxMetricBlockSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return blocksRead, fmt.Errorf("cannot read MetricBlock #%d: %w", blocksRead, err)
|
return fmt.Errorf("cannot read MetricBlock #%d: %w", blocksRead, err)
|
||||||
}
|
}
|
||||||
if len(buf) == 0 {
|
if len(buf) == 0 {
|
||||||
// Reached the end of the response
|
// Reached the end of the response
|
||||||
return blocksRead, nil
|
return nil
|
||||||
}
|
}
|
||||||
tail, err := mb.Unmarshal(buf)
|
tail, err := mb.Unmarshal(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return blocksRead, fmt.Errorf("cannot unmarshal MetricBlock #%d: %w", blocksRead, err)
|
return fmt.Errorf("cannot unmarshal MetricBlock #%d: %w", blocksRead, err)
|
||||||
}
|
}
|
||||||
if len(tail) != 0 {
|
if len(tail) != 0 {
|
||||||
return blocksRead, fmt.Errorf("non-empty tail after unmarshaling MetricBlock #%d: (len=%d) %q", blocksRead, len(tail), tail)
|
return fmt.Errorf("non-empty tail after unmarshaling MetricBlock #%d: (len=%d) %q", blocksRead, len(tail), tail)
|
||||||
}
|
}
|
||||||
blocksRead++
|
blocksRead++
|
||||||
sn.metricBlocksRead.Inc()
|
sn.metricBlocksRead.Inc()
|
||||||
sn.metricRowsRead.Add(mb.Block.RowsCount())
|
sn.metricRowsRead.Add(mb.Block.RowsCount())
|
||||||
if err := processBlock(&mb); err != nil {
|
if err := processBlock(&mb); err != nil {
|
||||||
return blocksRead, fmt.Errorf("cannot process MetricBlock #%d: %w", blocksRead, err)
|
return fmt.Errorf("cannot process MetricBlock #%d: %w", blocksRead, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue