mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
27a5461785
The buffered connection could have exceeded the underlying connection deadline during reading or writing to an internal buffer. With this change, buffered connection struct additionally checks for a deadline in Read/Write methods. Signed-off-by: hagen1778 <roman@victoriametrics.com>
108 lines
2.4 KiB
Go
108 lines
2.4 KiB
Go
package handshake
|
|
|
|
import (
|
|
"bufio"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd"
|
|
)
|
|
|
|
type bufferedWriter interface {
|
|
Write(p []byte) (int, error)
|
|
Flush() error
|
|
}
|
|
|
|
// BufferedConn is a net.Conn with Flush suport.
|
|
type BufferedConn struct {
|
|
net.Conn
|
|
|
|
br io.Reader
|
|
bw bufferedWriter
|
|
|
|
deadline time.Time
|
|
}
|
|
|
|
const bufferSize = 64 * 1024
|
|
|
|
// newBufferedConn returns buffered connection with the given compression level.
|
|
func newBufferedConn(c net.Conn, compressionLevel int, isReadCompressed bool) *BufferedConn {
|
|
bc := &BufferedConn{
|
|
Conn: c,
|
|
}
|
|
if compressionLevel <= 0 {
|
|
bc.bw = bufio.NewWriterSize(c, bufferSize)
|
|
} else {
|
|
bc.bw = zstd.NewWriterLevel(c, compressionLevel)
|
|
}
|
|
if !isReadCompressed {
|
|
bc.br = bufio.NewReaderSize(c, bufferSize)
|
|
} else {
|
|
bc.br = zstd.NewReader(c)
|
|
}
|
|
return bc
|
|
}
|
|
|
|
// SetDeadline sets the read and write deadlines associated with the connection.
|
|
// Deadline is checked on each Read call.
|
|
func (bc *BufferedConn) SetDeadline(t time.Time) error {
|
|
bc.deadline = t
|
|
return bc.Conn.SetDeadline(t)
|
|
}
|
|
|
|
// Read reads up to len(p) from bc to p.
|
|
func (bc *BufferedConn) Read(p []byte) (int, error) {
|
|
startTime := time.Now()
|
|
if startTime.After(bc.deadline) {
|
|
return 0, os.ErrDeadlineExceeded
|
|
}
|
|
n, err := bc.br.Read(p)
|
|
if err != nil && err != io.EOF {
|
|
err = fmt.Errorf("cannot read data in %.3f seconds: %w", time.Since(startTime).Seconds(), err)
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
// Write writes p to bc.
|
|
//
|
|
// Do not forget to call Flush if needed.
|
|
func (bc *BufferedConn) Write(p []byte) (int, error) {
|
|
startTime := time.Now()
|
|
if startTime.After(bc.deadline) {
|
|
return 0, os.ErrDeadlineExceeded
|
|
}
|
|
n, err := bc.bw.Write(p)
|
|
if err != nil {
|
|
err = fmt.Errorf("cannot write data in %.3f seconds: %w", time.Since(startTime).Seconds(), err)
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
// Close closes bc.
|
|
func (bc *BufferedConn) Close() error {
|
|
// Close the Conn at first. It is expected that all the required data
|
|
// is already flushed to the Conn.
|
|
err := bc.Conn.Close()
|
|
bc.Conn = nil
|
|
|
|
if zr, ok := bc.br.(*zstd.Reader); ok {
|
|
zr.Release()
|
|
}
|
|
bc.br = nil
|
|
|
|
if zw, ok := bc.bw.(*zstd.Writer); ok {
|
|
// Do not call zw.Close(), since we already closed the underlying conn.
|
|
zw.Release()
|
|
}
|
|
bc.bw = nil
|
|
|
|
return err
|
|
}
|
|
|
|
// Flush flushes internal write buffers to the underlying conn.
|
|
func (bc *BufferedConn) Flush() error {
|
|
return bc.bw.Flush()
|
|
}
|