mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
313 lines
6 KiB
Go
313 lines
6 KiB
Go
package fasthttputil
|
|
|
|
import (
|
|
"errors"
|
|
"io"
|
|
"net"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// NewPipeConns returns new bi-directional connection pipe.
|
|
//
|
|
// PipeConns is NOT safe for concurrent use by multiple goroutines!
|
|
func NewPipeConns() *PipeConns {
|
|
ch1 := make(chan *byteBuffer, 4)
|
|
ch2 := make(chan *byteBuffer, 4)
|
|
|
|
pc := &PipeConns{
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
pc.c1.rCh = ch1
|
|
pc.c1.wCh = ch2
|
|
pc.c2.rCh = ch2
|
|
pc.c2.wCh = ch1
|
|
pc.c1.pc = pc
|
|
pc.c2.pc = pc
|
|
return pc
|
|
}
|
|
|
|
// PipeConns provides bi-directional connection pipe,
|
|
// which use in-process memory as a transport.
|
|
//
|
|
// PipeConns must be created by calling NewPipeConns.
|
|
//
|
|
// PipeConns has the following additional features comparing to connections
|
|
// returned from net.Pipe():
|
|
//
|
|
// * It is faster.
|
|
// * It buffers Write calls, so there is no need to have concurrent goroutine
|
|
// calling Read in order to unblock each Write call.
|
|
// * It supports read and write deadlines.
|
|
//
|
|
// PipeConns is NOT safe for concurrent use by multiple goroutines!
|
|
type PipeConns struct {
|
|
c1 pipeConn
|
|
c2 pipeConn
|
|
stopCh chan struct{}
|
|
stopChLock sync.Mutex
|
|
}
|
|
|
|
// Conn1 returns the first end of bi-directional pipe.
|
|
//
|
|
// Data written to Conn1 may be read from Conn2.
|
|
// Data written to Conn2 may be read from Conn1.
|
|
func (pc *PipeConns) Conn1() net.Conn {
|
|
return &pc.c1
|
|
}
|
|
|
|
// Conn2 returns the second end of bi-directional pipe.
|
|
//
|
|
// Data written to Conn2 may be read from Conn1.
|
|
// Data written to Conn1 may be read from Conn2.
|
|
func (pc *PipeConns) Conn2() net.Conn {
|
|
return &pc.c2
|
|
}
|
|
|
|
// Close closes pipe connections.
|
|
func (pc *PipeConns) Close() error {
|
|
pc.stopChLock.Lock()
|
|
select {
|
|
case <-pc.stopCh:
|
|
default:
|
|
close(pc.stopCh)
|
|
}
|
|
pc.stopChLock.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
type pipeConn struct {
|
|
b *byteBuffer
|
|
bb []byte
|
|
|
|
rCh chan *byteBuffer
|
|
wCh chan *byteBuffer
|
|
pc *PipeConns
|
|
|
|
readDeadlineTimer *time.Timer
|
|
writeDeadlineTimer *time.Timer
|
|
|
|
readDeadlineCh <-chan time.Time
|
|
writeDeadlineCh <-chan time.Time
|
|
|
|
readDeadlineChLock sync.Mutex
|
|
}
|
|
|
|
func (c *pipeConn) Write(p []byte) (int, error) {
|
|
b := acquireByteBuffer()
|
|
b.b = append(b.b[:0], p...)
|
|
|
|
select {
|
|
case <-c.pc.stopCh:
|
|
releaseByteBuffer(b)
|
|
return 0, errConnectionClosed
|
|
default:
|
|
}
|
|
|
|
select {
|
|
case c.wCh <- b:
|
|
default:
|
|
select {
|
|
case c.wCh <- b:
|
|
case <-c.writeDeadlineCh:
|
|
c.writeDeadlineCh = closedDeadlineCh
|
|
return 0, ErrTimeout
|
|
case <-c.pc.stopCh:
|
|
releaseByteBuffer(b)
|
|
return 0, errConnectionClosed
|
|
}
|
|
}
|
|
|
|
return len(p), nil
|
|
}
|
|
|
|
func (c *pipeConn) Read(p []byte) (int, error) {
|
|
mayBlock := true
|
|
nn := 0
|
|
for len(p) > 0 {
|
|
n, err := c.read(p, mayBlock)
|
|
nn += n
|
|
if err != nil {
|
|
if !mayBlock && err == errWouldBlock {
|
|
err = nil
|
|
}
|
|
return nn, err
|
|
}
|
|
p = p[n:]
|
|
mayBlock = false
|
|
}
|
|
|
|
return nn, nil
|
|
}
|
|
|
|
func (c *pipeConn) read(p []byte, mayBlock bool) (int, error) {
|
|
if len(c.bb) == 0 {
|
|
if err := c.readNextByteBuffer(mayBlock); err != nil {
|
|
return 0, err
|
|
}
|
|
}
|
|
n := copy(p, c.bb)
|
|
c.bb = c.bb[n:]
|
|
|
|
return n, nil
|
|
}
|
|
|
|
func (c *pipeConn) readNextByteBuffer(mayBlock bool) error {
|
|
releaseByteBuffer(c.b)
|
|
c.b = nil
|
|
|
|
select {
|
|
case c.b = <-c.rCh:
|
|
default:
|
|
if !mayBlock {
|
|
return errWouldBlock
|
|
}
|
|
c.readDeadlineChLock.Lock()
|
|
readDeadlineCh := c.readDeadlineCh
|
|
c.readDeadlineChLock.Unlock()
|
|
select {
|
|
case c.b = <-c.rCh:
|
|
case <-readDeadlineCh:
|
|
c.readDeadlineChLock.Lock()
|
|
c.readDeadlineCh = closedDeadlineCh
|
|
c.readDeadlineChLock.Unlock()
|
|
// rCh may contain data when deadline is reached.
|
|
// Read the data before returning ErrTimeout.
|
|
select {
|
|
case c.b = <-c.rCh:
|
|
default:
|
|
return ErrTimeout
|
|
}
|
|
case <-c.pc.stopCh:
|
|
// rCh may contain data when stopCh is closed.
|
|
// Read the data before returning EOF.
|
|
select {
|
|
case c.b = <-c.rCh:
|
|
default:
|
|
return io.EOF
|
|
}
|
|
}
|
|
}
|
|
|
|
c.bb = c.b.b
|
|
return nil
|
|
}
|
|
|
|
var (
|
|
errWouldBlock = errors.New("would block")
|
|
errConnectionClosed = errors.New("connection closed")
|
|
)
|
|
|
|
type timeoutError struct {
|
|
}
|
|
|
|
func (e *timeoutError) Error() string {
|
|
return "timeout"
|
|
}
|
|
|
|
// Only implement the Timeout() function of the net.Error interface.
|
|
// This allows for checks like:
|
|
//
|
|
// if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
|
|
func (e *timeoutError) Timeout() bool {
|
|
return true
|
|
}
|
|
|
|
var (
|
|
// ErrTimeout is returned from Read() or Write() on timeout.
|
|
ErrTimeout = &timeoutError{}
|
|
)
|
|
|
|
func (c *pipeConn) Close() error {
|
|
return c.pc.Close()
|
|
}
|
|
|
|
func (c *pipeConn) LocalAddr() net.Addr {
|
|
return pipeAddr(0)
|
|
}
|
|
|
|
func (c *pipeConn) RemoteAddr() net.Addr {
|
|
return pipeAddr(0)
|
|
}
|
|
|
|
func (c *pipeConn) SetDeadline(deadline time.Time) error {
|
|
c.SetReadDeadline(deadline) //nolint:errcheck
|
|
c.SetWriteDeadline(deadline) //nolint:errcheck
|
|
return nil
|
|
}
|
|
|
|
func (c *pipeConn) SetReadDeadline(deadline time.Time) error {
|
|
if c.readDeadlineTimer == nil {
|
|
c.readDeadlineTimer = time.NewTimer(time.Hour)
|
|
}
|
|
readDeadlineCh := updateTimer(c.readDeadlineTimer, deadline)
|
|
c.readDeadlineChLock.Lock()
|
|
c.readDeadlineCh = readDeadlineCh
|
|
c.readDeadlineChLock.Unlock()
|
|
return nil
|
|
}
|
|
|
|
func (c *pipeConn) SetWriteDeadline(deadline time.Time) error {
|
|
if c.writeDeadlineTimer == nil {
|
|
c.writeDeadlineTimer = time.NewTimer(time.Hour)
|
|
}
|
|
c.writeDeadlineCh = updateTimer(c.writeDeadlineTimer, deadline)
|
|
return nil
|
|
}
|
|
|
|
func updateTimer(t *time.Timer, deadline time.Time) <-chan time.Time {
|
|
if !t.Stop() {
|
|
select {
|
|
case <-t.C:
|
|
default:
|
|
}
|
|
}
|
|
if deadline.IsZero() {
|
|
return nil
|
|
}
|
|
d := -time.Since(deadline)
|
|
if d <= 0 {
|
|
return closedDeadlineCh
|
|
}
|
|
t.Reset(d)
|
|
return t.C
|
|
}
|
|
|
|
var closedDeadlineCh = func() <-chan time.Time {
|
|
ch := make(chan time.Time)
|
|
close(ch)
|
|
return ch
|
|
}()
|
|
|
|
type pipeAddr int
|
|
|
|
func (pipeAddr) Network() string {
|
|
return "pipe"
|
|
}
|
|
|
|
func (pipeAddr) String() string {
|
|
return "pipe"
|
|
}
|
|
|
|
type byteBuffer struct {
|
|
b []byte
|
|
}
|
|
|
|
func acquireByteBuffer() *byteBuffer {
|
|
return byteBufferPool.Get().(*byteBuffer)
|
|
}
|
|
|
|
func releaseByteBuffer(b *byteBuffer) {
|
|
if b != nil {
|
|
byteBufferPool.Put(b)
|
|
}
|
|
}
|
|
|
|
var byteBufferPool = &sync.Pool{
|
|
New: func() interface{} {
|
|
return &byteBuffer{
|
|
b: make([]byte, 1024),
|
|
}
|
|
},
|
|
}
|