From 1b90a091cfbc98abd512874fc74b1a12602148d7 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 24 Feb 2021 11:43:09 +0200 Subject: [PATCH] app/vmselect/netstorage: cleanup after 4805b80977b7992d3f49e9be71726bc0c7690363 --- app/vmselect/netstorage/netstorage.go | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 1014fc993..ee244e490 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -1705,8 +1705,7 @@ func (sn *storageNode) processSearchMetricNames(requestData []byte, deadline sea func (sn *storageNode) processSearchQuery(requestData []byte, fetchData bool, processBlock func(mb *storage.MetricBlock) error, deadline searchutils.Deadline) error { f := func(bc *handshake.BufferedConn) error { - _, err := sn.processSearchQueryOnConn(bc, requestData, fetchData, processBlock) - if err != nil { + if err := sn.processSearchQueryOnConn(bc, requestData, fetchData, processBlock); err != nil { return err } return nil @@ -2250,25 +2249,25 @@ func (sn *storageNode) processSearchMetricNamesOnConn(bc *handshake.BufferedConn const maxMetricNameSize = 64 * 1024 -func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requestData []byte, fetchData bool, processBlock func(mb *storage.MetricBlock) error) (int, error) { +func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requestData []byte, fetchData bool, processBlock func(mb *storage.MetricBlock) error) error { // Send the request to sn. if err := writeBytes(bc, requestData); err != nil { - return 0, fmt.Errorf("cannot write requestData: %w", err) + return fmt.Errorf("cannot write requestData: %w", err) } if err := writeBool(bc, fetchData); err != nil { - return 0, fmt.Errorf("cannot write fetchData=%v: %w", fetchData, err) + return fmt.Errorf("cannot write fetchData=%v: %w", fetchData, err) } if err := bc.Flush(); err != nil { - return 0, fmt.Errorf("cannot flush requestData to conn: %w", err) + return fmt.Errorf("cannot flush requestData to conn: %w", err) } // Read response error. buf, err := readBytes(nil, bc, maxErrorMessageSize) if err != nil { - return 0, fmt.Errorf("cannot read error message: %w", err) + return fmt.Errorf("cannot read error message: %w", err) } if len(buf) > 0 { - return 0, newErrRemote(buf) + return newErrRemote(buf) } // Read response. It may consist of multiple MetricBlocks. @@ -2277,24 +2276,24 @@ func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requ for { buf, err = readBytes(buf[:0], bc, maxMetricBlockSize) if err != nil { - return blocksRead, fmt.Errorf("cannot read MetricBlock #%d: %w", blocksRead, err) + return fmt.Errorf("cannot read MetricBlock #%d: %w", blocksRead, err) } if len(buf) == 0 { // Reached the end of the response - return blocksRead, nil + return nil } tail, err := mb.Unmarshal(buf) if err != nil { - return blocksRead, fmt.Errorf("cannot unmarshal MetricBlock #%d: %w", blocksRead, err) + return fmt.Errorf("cannot unmarshal MetricBlock #%d: %w", blocksRead, err) } if len(tail) != 0 { - return blocksRead, fmt.Errorf("non-empty tail after unmarshaling MetricBlock #%d: (len=%d) %q", blocksRead, len(tail), tail) + return fmt.Errorf("non-empty tail after unmarshaling MetricBlock #%d: (len=%d) %q", blocksRead, len(tail), tail) } blocksRead++ sn.metricBlocksRead.Inc() sn.metricRowsRead.Add(mb.Block.RowsCount()) if err := processBlock(&mb); err != nil { - return blocksRead, fmt.Errorf("cannot process MetricBlock #%d: %w", blocksRead, err) + return fmt.Errorf("cannot process MetricBlock #%d: %w", blocksRead, err) } } }