diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index bf40ba46d5..a2dab05747 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -101,6 +101,7 @@ Released at 2023-11-15 * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): increment `vmalert_remotewrite_errors_total` metric if all retries to send remote-write request failed. Before, this metric was incremented only if remote-write client's buffer is overloaded. * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): increment `vmalert_remotewrite_dropped_rows_total` and `vmalert_remotewrite_dropped_bytes_total` metrics if remote-write client's buffer is overloaded. Before, these metrics were incremented only after unsuccessful HTTP calls. * BUGFIX: `vmselect`: improve performance and memory usage during query processing on machines with big number of CPU cores. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5087). +* BUGFIX: improve deadline detection when using buffered connection for communication between cluster components. Before, due to nature of a buffered connection the deadline could have been exceeded while reading or writing data to connection. * BUGFIX: dashboards: fix vminsert/vmstorage/vmselect metrics filtering when dashboard is used to display data from many sub-clusters with unique job names. Before, only one specific job could have been accounted for component-specific panels, instead of all available jobs for the component. * BUGFIX: dashboards: respect `job` and `instance` filters for `alerts` annotation in cluster and single-node dashboards. * BUGFIX: dashboards: update description for RSS and anonymous memory panels to be consistent for single-node, cluster and vmagent dashboards. diff --git a/lib/handshake/buffered_conn.go b/lib/handshake/buffered_conn.go index 0af1d5964d..d42a2d6d20 100644 --- a/lib/handshake/buffered_conn.go +++ b/lib/handshake/buffered_conn.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "net" + "os" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd" @@ -21,6 +22,8 @@ type BufferedConn struct { br io.Reader bw bufferedWriter + + deadline time.Time } const bufferSize = 64 * 1024 @@ -43,9 +46,19 @@ func newBufferedConn(c net.Conn, compressionLevel int, isReadCompressed bool) *B return bc } +// SetDeadline sets the read and write deadlines associated with the connection. +// Deadline is checked on each Read call. +func (bc *BufferedConn) SetDeadline(t time.Time) error { + bc.deadline = t + return bc.Conn.SetDeadline(t) +} + // Read reads up to len(p) from bc to p. func (bc *BufferedConn) Read(p []byte) (int, error) { startTime := time.Now() + if startTime.After(bc.deadline) { + return 0, os.ErrDeadlineExceeded + } n, err := bc.br.Read(p) if err != nil && err != io.EOF { err = fmt.Errorf("cannot read data in %.3f seconds: %w", time.Since(startTime).Seconds(), err) @@ -58,6 +71,9 @@ func (bc *BufferedConn) Read(p []byte) (int, error) { // Do not forget to call Flush if needed. func (bc *BufferedConn) Write(p []byte) (int, error) { startTime := time.Now() + if startTime.After(bc.deadline) { + return 0, os.ErrDeadlineExceeded + } n, err := bc.bw.Write(p) if err != nil { err = fmt.Errorf("cannot write data in %.3f seconds: %w", time.Since(startTime).Seconds(), err)