mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vminsert: simultaneously accept telnet put
and HTTP /api/put
OpenTSDB metrics at -opentsdbListenAddr
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/266
This commit is contained in:
parent
bc3984a5b3
commit
a7bf8e77af
7 changed files with 362 additions and 118 deletions
|
@ -135,6 +135,7 @@ with [the official Grafana dashboard for VictoriaMetrics cluster](https://grafan
|
||||||
- `<suffix>` may have the following values:
|
- `<suffix>` may have the following values:
|
||||||
- `prometheus` - for inserting data with [Prometheus remote write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write)
|
- `prometheus` - for inserting data with [Prometheus remote write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write)
|
||||||
- `influx/write` or `influx/api/v2/write` - for inserting data with [Influx line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/)
|
- `influx/write` or `influx/api/v2/write` - for inserting data with [Influx line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/)
|
||||||
|
- `opentsdb/api/put` - for accepting [OpenTSDB HTTP /api/put requests](http://opentsdb.net/docs/build/html/api_http/put.html).
|
||||||
- `prometheus/api/v1/import` - for importing data obtained via `api/v1/export` on `vmselect` (see below).
|
- `prometheus/api/v1/import` - for importing data obtained via `api/v1/export` on `vmselect` (see below).
|
||||||
|
|
||||||
* URLs for querying: `http://<vmselect>:8481/select/<accountID>/prometheus/<suffix>`, where:
|
* URLs for querying: `http://<vmselect>:8481/select/<accountID>/prometheus/<suffix>`, where:
|
||||||
|
|
|
@ -22,36 +22,62 @@ var (
|
||||||
writeErrorsUDP = metrics.NewCounter(`vm_graphite_request_errors_total{name="write", net="udp"}`)
|
writeErrorsUDP = metrics.NewCounter(`vm_graphite_request_errors_total{name="write", net="udp"}`)
|
||||||
)
|
)
|
||||||
|
|
||||||
// Serve starts graphite server on the given addr.
|
// Server accepts Graphite plaintext lines over TCP and UDP.
|
||||||
func Serve(addr string) {
|
type Server struct {
|
||||||
|
addr string
|
||||||
|
lnTCP net.Listener
|
||||||
|
lnUDP net.PacketConn
|
||||||
|
wg sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
// MustStart starts graphite server on the given addr.
|
||||||
|
//
|
||||||
|
// MustStop must be called on the returned server when it is no longer needed.
|
||||||
|
func MustStart(addr string) *Server {
|
||||||
logger.Infof("starting TCP Graphite server at %q", addr)
|
logger.Infof("starting TCP Graphite server at %q", addr)
|
||||||
lnTCP, err := netutil.NewTCPListener("graphite", addr)
|
lnTCP, err := netutil.NewTCPListener("graphite", addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatalf("cannot start TCP Graphite server at %q: %s", addr, err)
|
logger.Fatalf("cannot start TCP Graphite server at %q: %s", addr, err)
|
||||||
}
|
}
|
||||||
listenerTCP = lnTCP
|
|
||||||
|
|
||||||
logger.Infof("starting UDP Graphite server at %q", addr)
|
logger.Infof("starting UDP Graphite server at %q", addr)
|
||||||
lnUDP, err := net.ListenPacket("udp4", addr)
|
lnUDP, err := net.ListenPacket("udp4", addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatalf("cannot start UDP Graphite server at %q: %s", addr, err)
|
logger.Fatalf("cannot start UDP Graphite server at %q: %s", addr, err)
|
||||||
}
|
}
|
||||||
listenerUDP = lnUDP
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
s := &Server{
|
||||||
wg.Add(1)
|
addr: addr,
|
||||||
|
lnTCP: lnTCP,
|
||||||
|
lnUDP: lnUDP,
|
||||||
|
}
|
||||||
|
s.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer s.wg.Done()
|
||||||
serveTCP(listenerTCP)
|
serveTCP(lnTCP)
|
||||||
logger.Infof("stopped TCP Graphite server at %q", addr)
|
logger.Infof("stopped TCP Graphite server at %q", addr)
|
||||||
}()
|
}()
|
||||||
wg.Add(1)
|
s.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer s.wg.Done()
|
||||||
serveUDP(listenerUDP)
|
serveUDP(lnUDP)
|
||||||
logger.Infof("stopped UDP Graphite server at %q", addr)
|
logger.Infof("stopped UDP Graphite server at %q", addr)
|
||||||
}()
|
}()
|
||||||
wg.Wait()
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
// MustStop stops the server.
|
||||||
|
func (s *Server) MustStop() {
|
||||||
|
logger.Infof("stopping TCP Graphite server at %q...", s.addr)
|
||||||
|
if err := s.lnTCP.Close(); err != nil {
|
||||||
|
logger.Errorf("cannot close TCP Graphite server: %s", err)
|
||||||
|
}
|
||||||
|
logger.Infof("stopping UDP Graphite server at %q...", s.addr)
|
||||||
|
if err := s.lnUDP.Close(); err != nil {
|
||||||
|
logger.Errorf("cannot close UDP Graphite server: %s", err)
|
||||||
|
}
|
||||||
|
s.wg.Wait()
|
||||||
|
logger.Infof("TCP and UDP Graphite servers at %q have been stopped", s.addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func serveTCP(ln net.Listener) {
|
func serveTCP(ln net.Listener) {
|
||||||
|
@ -60,6 +86,7 @@ func serveTCP(ln net.Listener) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if ne, ok := err.(net.Error); ok {
|
if ne, ok := err.(net.Error); ok {
|
||||||
if ne.Temporary() {
|
if ne.Temporary() {
|
||||||
|
logger.Errorf("graphite: temporary error when listening for TCP addr %q: %s", ln.Addr(), err)
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -100,6 +127,7 @@ func serveUDP(ln net.PacketConn) {
|
||||||
writeErrorsUDP.Inc()
|
writeErrorsUDP.Inc()
|
||||||
if ne, ok := err.(net.Error); ok {
|
if ne, ok := err.(net.Error); ok {
|
||||||
if ne.Temporary() {
|
if ne.Temporary() {
|
||||||
|
logger.Errorf("graphite: temporary error when listening for UDP addr %q: %s", ln.LocalAddr(), err)
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -122,20 +150,3 @@ func serveUDP(ln net.PacketConn) {
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
|
||||||
listenerTCP net.Listener
|
|
||||||
listenerUDP net.PacketConn
|
|
||||||
)
|
|
||||||
|
|
||||||
// Stop stops the server.
|
|
||||||
func Stop() {
|
|
||||||
logger.Infof("stopping TCP Graphite server at %q...", listenerTCP.Addr())
|
|
||||||
if err := listenerTCP.Close(); err != nil {
|
|
||||||
logger.Errorf("cannot close TCP Graphite server: %s", err)
|
|
||||||
}
|
|
||||||
logger.Infof("stopping UDP Graphite server at %q...", listenerUDP.LocalAddr())
|
|
||||||
if err := listenerUDP.Close(); err != nil {
|
|
||||||
logger.Errorf("cannot close UDP Graphite server: %s", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -26,8 +26,10 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty")
|
graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty")
|
||||||
opentsdbListenAddr = flag.String("opentsdbListenAddr", "", "TCP and UDP address to listen for OpentTSDB put messages. Usually :4242 must be set. Doesn't work if empty")
|
opentsdbListenAddr = flag.String("opentsdbListenAddr", "", "TCP and UDP address to listen for OpentTSDB metrics. "+
|
||||||
|
"Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. "+
|
||||||
|
"Usually :4242 must be set. Doesn't work if empty")
|
||||||
opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty")
|
opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty")
|
||||||
httpListenAddr = flag.String("httpListenAddr", ":8480", "Address to listen for http connections")
|
httpListenAddr = flag.String("httpListenAddr", ":8480", "Address to listen for http connections")
|
||||||
maxInsertRequestSize = flag.Int("maxInsertRequestSize", 32*1024*1024, "The maximum size of a single insert request in bytes")
|
maxInsertRequestSize = flag.Int("maxInsertRequestSize", 32*1024*1024, "The maximum size of a single insert request in bytes")
|
||||||
|
@ -35,6 +37,12 @@ var (
|
||||||
storageNodes = flagutil.NewArray("storageNode", "Address of vmstorage nodes; usage: -storageNode=vmstorage-host1:8400 -storageNode=vmstorage-host2:8400")
|
storageNodes = flagutil.NewArray("storageNode", "Address of vmstorage nodes; usage: -storageNode=vmstorage-host1:8400 -storageNode=vmstorage-host2:8400")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
graphiteServer *graphite.Server
|
||||||
|
opentsdbServer *opentsdb.Server
|
||||||
|
opentsdbhttpServer *opentsdbhttp.Server
|
||||||
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
buildinfo.Init()
|
buildinfo.Init()
|
||||||
|
@ -52,13 +60,13 @@ func main() {
|
||||||
|
|
||||||
concurrencylimiter.Init()
|
concurrencylimiter.Init()
|
||||||
if len(*graphiteListenAddr) > 0 {
|
if len(*graphiteListenAddr) > 0 {
|
||||||
go graphite.Serve(*graphiteListenAddr)
|
graphiteServer = graphite.MustStart(*graphiteListenAddr)
|
||||||
}
|
}
|
||||||
if len(*opentsdbListenAddr) > 0 {
|
if len(*opentsdbListenAddr) > 0 {
|
||||||
go opentsdb.Serve(*opentsdbListenAddr)
|
opentsdbServer = opentsdb.MustStart(*opentsdbListenAddr, int64(*maxInsertRequestSize))
|
||||||
}
|
}
|
||||||
if len(*opentsdbHTTPListenAddr) > 0 {
|
if len(*opentsdbHTTPListenAddr) > 0 {
|
||||||
go opentsdbhttp.Serve(*opentsdbHTTPListenAddr, int64(*maxInsertRequestSize))
|
opentsdbhttpServer = opentsdbhttp.MustStart(*opentsdbHTTPListenAddr, int64(*maxInsertRequestSize))
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -76,13 +84,13 @@ func main() {
|
||||||
logger.Infof("successfully shut down the service in %s", time.Since(startTime))
|
logger.Infof("successfully shut down the service in %s", time.Since(startTime))
|
||||||
|
|
||||||
if len(*graphiteListenAddr) > 0 {
|
if len(*graphiteListenAddr) > 0 {
|
||||||
graphite.Stop()
|
graphiteServer.MustStop()
|
||||||
}
|
}
|
||||||
if len(*opentsdbListenAddr) > 0 {
|
if len(*opentsdbListenAddr) > 0 {
|
||||||
opentsdb.Stop()
|
opentsdbServer.MustStop()
|
||||||
}
|
}
|
||||||
if len(*opentsdbHTTPListenAddr) > 0 {
|
if len(*opentsdbHTTPListenAddr) > 0 {
|
||||||
opentsdbhttp.Stop()
|
opentsdbhttpServer.MustStop()
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Infof("shutting down neststorage...")
|
logger.Infof("shutting down neststorage...")
|
||||||
|
|
159
app/vminsert/opentsdb/listener_switch.go
Normal file
159
app/vminsert/opentsdb/listener_switch.go
Normal file
|
@ -0,0 +1,159 @@
|
||||||
|
package opentsdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
// listenerSwitch listens for incoming connections and multiplexes them to OpenTSDB http or telnet listeners
|
||||||
|
// depending on the first byte in the accepted connection.
|
||||||
|
//
|
||||||
|
// It is expected that both listeners - http and telnet consume incoming connections as soon as possible.
|
||||||
|
type listenerSwitch struct {
|
||||||
|
ln net.Listener
|
||||||
|
wg sync.WaitGroup
|
||||||
|
|
||||||
|
telnetConnsCh chan net.Conn
|
||||||
|
httpConnsCh chan net.Conn
|
||||||
|
|
||||||
|
closeLock sync.Mutex
|
||||||
|
closed bool
|
||||||
|
acceptErr error
|
||||||
|
closeErr error
|
||||||
|
}
|
||||||
|
|
||||||
|
func newListenerSwitch(ln net.Listener) *listenerSwitch {
|
||||||
|
ls := &listenerSwitch{
|
||||||
|
ln: ln,
|
||||||
|
}
|
||||||
|
ls.telnetConnsCh = make(chan net.Conn)
|
||||||
|
ls.httpConnsCh = make(chan net.Conn)
|
||||||
|
ls.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
ls.worker()
|
||||||
|
close(ls.telnetConnsCh)
|
||||||
|
close(ls.httpConnsCh)
|
||||||
|
ls.wg.Done()
|
||||||
|
}()
|
||||||
|
return ls
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ls *listenerSwitch) stop() error {
|
||||||
|
var err error
|
||||||
|
ls.closeLock.Lock()
|
||||||
|
if !ls.closed {
|
||||||
|
err = ls.ln.Close()
|
||||||
|
ls.closeErr = err
|
||||||
|
ls.closed = true
|
||||||
|
}
|
||||||
|
ls.closeLock.Unlock()
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
// Wait until worker detects the closed ls.ln and exits.
|
||||||
|
ls.wg.Wait()
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ls *listenerSwitch) worker() {
|
||||||
|
var buf [1]byte
|
||||||
|
for {
|
||||||
|
c, err := ls.ln.Accept()
|
||||||
|
if err != nil {
|
||||||
|
if ne, ok := err.(net.Error); ok && ne.Temporary() {
|
||||||
|
logger.Infof("listenerSwitch: temporary error at %q: %s; sleeping for a second...", ls.ln.Addr(), err)
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
ls.closeLock.Lock()
|
||||||
|
ls.acceptErr = err
|
||||||
|
ls.closeLock.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if _, err := io.ReadFull(c, buf[:]); err != nil {
|
||||||
|
logger.Errorf("listenerSwitch: cannot read one byte from the underlying connection for %q: %s", ls.ln.Addr(), err)
|
||||||
|
_ = c.Close()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// It is expected that both listeners - http and telnet consume incoming connections as soon as possible,
|
||||||
|
// so the below code shouldn't block for extended periods of time.
|
||||||
|
pc := &peekedConn{
|
||||||
|
Conn: c,
|
||||||
|
firstChar: buf[0],
|
||||||
|
}
|
||||||
|
if buf[0] == 'p' {
|
||||||
|
// Assume the request starts with `put`.
|
||||||
|
ls.telnetConnsCh <- pc
|
||||||
|
} else {
|
||||||
|
// Assume the request starts with `POST`.
|
||||||
|
ls.httpConnsCh <- pc
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type peekedConn struct {
|
||||||
|
net.Conn
|
||||||
|
firstChar byte
|
||||||
|
firstCharRead bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pc *peekedConn) Read(p []byte) (int, error) {
|
||||||
|
// It is assumed that the pc cannot be read from concurrent goroutines.
|
||||||
|
if pc.firstCharRead {
|
||||||
|
// Fast path - first char already read.
|
||||||
|
return pc.Conn.Read(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Slow path - read the first char.
|
||||||
|
if len(p) == 0 {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
p[0] = pc.firstChar
|
||||||
|
pc.firstCharRead = true
|
||||||
|
n, err := pc.Conn.Read(p[1:])
|
||||||
|
return n + 1, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ls *listenerSwitch) newTelnetListener() *chanListener {
|
||||||
|
return &chanListener{
|
||||||
|
ls: ls,
|
||||||
|
ch: ls.telnetConnsCh,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ls *listenerSwitch) newHTTPListener() *chanListener {
|
||||||
|
return &chanListener{
|
||||||
|
ls: ls,
|
||||||
|
ch: ls.httpConnsCh,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type chanListener struct {
|
||||||
|
ls *listenerSwitch
|
||||||
|
ch chan net.Conn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cl *chanListener) Accept() (net.Conn, error) {
|
||||||
|
c, ok := <-cl.ch
|
||||||
|
if ok {
|
||||||
|
return c, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
cl.ls.closeLock.Lock()
|
||||||
|
err := cl.ls.acceptErr
|
||||||
|
cl.ls.closeLock.Unlock()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cl *chanListener) Close() error {
|
||||||
|
return cl.ls.stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cl *chanListener) Addr() net.Addr {
|
||||||
|
return cl.ls.ln.Addr()
|
||||||
|
}
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
|
@ -22,44 +23,91 @@ var (
|
||||||
writeErrorsUDP = metrics.NewCounter(`vm_opentsdb_request_errors_total{name="write", net="udp"}`)
|
writeErrorsUDP = metrics.NewCounter(`vm_opentsdb_request_errors_total{name="write", net="udp"}`)
|
||||||
)
|
)
|
||||||
|
|
||||||
// Serve starts OpenTSDB collector on the given addr.
|
// Server is a server for collecting OpenTSDB TCP and UDP metrics.
|
||||||
func Serve(addr string) {
|
//
|
||||||
|
// It accepts simultaneously Telnet put requests and HTTP put requests over TCP.
|
||||||
|
type Server struct {
|
||||||
|
addr string
|
||||||
|
ls *listenerSwitch
|
||||||
|
httpServer *opentsdbhttp.Server
|
||||||
|
lnUDP net.PacketConn
|
||||||
|
wg sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
// MustStart starts OpenTSDB collector on the given addr.
|
||||||
|
//
|
||||||
|
// MustStop must be called on the returned server when it is no longer needed.
|
||||||
|
func MustStart(addr string, maxRequestSize int64) *Server {
|
||||||
logger.Infof("starting TCP OpenTSDB collector at %q", addr)
|
logger.Infof("starting TCP OpenTSDB collector at %q", addr)
|
||||||
lnTCP, err := netutil.NewTCPListener("opentsdb", addr)
|
lnTCP, err := netutil.NewTCPListener("opentsdb", addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatalf("cannot start TCP OpenTSDB collector at %q: %s", addr, err)
|
logger.Fatalf("cannot start TCP OpenTSDB collector at %q: %s", addr, err)
|
||||||
}
|
}
|
||||||
listenerTCP = lnTCP
|
ls := newListenerSwitch(lnTCP)
|
||||||
|
lnHTTP := ls.newHTTPListener()
|
||||||
|
lnTelnet := ls.newTelnetListener()
|
||||||
|
httpServer := opentsdbhttp.MustServe(lnHTTP, maxRequestSize)
|
||||||
|
|
||||||
logger.Infof("starting UDP OpenTSDB collector at %q", addr)
|
logger.Infof("starting UDP OpenTSDB collector at %q", addr)
|
||||||
lnUDP, err := net.ListenPacket("udp4", addr)
|
lnUDP, err := net.ListenPacket("udp4", addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatalf("cannot start UDP OpenTSDB collector at %q: %s", addr, err)
|
logger.Fatalf("cannot start UDP OpenTSDB collector at %q: %s", addr, err)
|
||||||
}
|
}
|
||||||
listenerUDP = lnUDP
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
s := &Server{
|
||||||
wg.Add(1)
|
addr: addr,
|
||||||
|
ls: ls,
|
||||||
|
httpServer: httpServer,
|
||||||
|
lnUDP: lnUDP,
|
||||||
|
}
|
||||||
|
s.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer s.wg.Done()
|
||||||
serveTCP(listenerTCP)
|
serveTelnet(lnTelnet)
|
||||||
logger.Infof("stopped TCP OpenTSDB collector at %q", addr)
|
logger.Infof("stopped TCP telnet OpenTSDB server at %q", addr)
|
||||||
}()
|
}()
|
||||||
wg.Add(1)
|
s.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer s.wg.Done()
|
||||||
serveUDP(listenerUDP)
|
httpServer.Wait()
|
||||||
logger.Infof("stopped UDP OpenTSDB collector at %q", addr)
|
// Do not log when httpServer is stopped, since this is logged by the server itself.
|
||||||
}()
|
}()
|
||||||
wg.Wait()
|
s.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer s.wg.Done()
|
||||||
|
serveUDP(lnUDP)
|
||||||
|
logger.Infof("stopped UDP OpenTSDB server at %q", addr)
|
||||||
|
}()
|
||||||
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func serveTCP(ln net.Listener) {
|
// MustStop stops the server.
|
||||||
|
func (s *Server) MustStop() {
|
||||||
|
// Stop HTTP server. Do not emit log message, since it is emitted by the httpServer.
|
||||||
|
s.httpServer.MustStop()
|
||||||
|
|
||||||
|
logger.Infof("stopping TCP telnet OpenTSDB server at %q...", s.addr)
|
||||||
|
if err := s.ls.stop(); err != nil {
|
||||||
|
logger.Errorf("cannot stop TCP telnet OpenTSDB server: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Infof("stopping UDP OpenTSDB server at %q...", s.addr)
|
||||||
|
if err := s.lnUDP.Close(); err != nil {
|
||||||
|
logger.Errorf("cannot stop UDP OpenTSDB server: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait until all the servers are stopped.
|
||||||
|
s.wg.Wait()
|
||||||
|
logger.Infof("TCP and UDP OpenTSDB servers at %q have been stopped", s.addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func serveTelnet(ln net.Listener) {
|
||||||
for {
|
for {
|
||||||
c, err := ln.Accept()
|
c, err := ln.Accept()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if ne, ok := err.(net.Error); ok {
|
if ne, ok := err.(net.Error); ok {
|
||||||
if ne.Temporary() {
|
if ne.Temporary() {
|
||||||
|
logger.Errorf("opentsdb: temporary error when listening for TCP addr %q: %s", ln.Addr(), err)
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -100,6 +148,7 @@ func serveUDP(ln net.PacketConn) {
|
||||||
writeErrorsUDP.Inc()
|
writeErrorsUDP.Inc()
|
||||||
if ne, ok := err.(net.Error); ok {
|
if ne, ok := err.(net.Error); ok {
|
||||||
if ne.Temporary() {
|
if ne.Temporary() {
|
||||||
|
logger.Errorf("opentsdb: temporary error when listening for UDP addr %q: %s", ln.LocalAddr(), err)
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -122,20 +171,3 @@ func serveUDP(ln net.PacketConn) {
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
|
||||||
listenerTCP net.Listener
|
|
||||||
listenerUDP net.PacketConn
|
|
||||||
)
|
|
||||||
|
|
||||||
// Stop stops the server.
|
|
||||||
func Stop() {
|
|
||||||
logger.Infof("stopping TCP OpenTSDB server at %q...", listenerTCP.Addr())
|
|
||||||
if err := listenerTCP.Close(); err != nil {
|
|
||||||
logger.Errorf("cannot close TCP OpenTSDB server: %s", err)
|
|
||||||
}
|
|
||||||
logger.Infof("stopping UDP OpenTSDB server at %q...", listenerUDP.LocalAddr())
|
|
||||||
if err := listenerUDP.Close(); err != nil {
|
|
||||||
logger.Errorf("cannot close UDP OpenTSDB server: %s", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -2,12 +2,15 @@ package opentsdbhttp
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -16,72 +19,99 @@ var (
|
||||||
writeErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/api/put", protocol="opentsdb-http"}`)
|
writeErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/api/put", protocol="opentsdb-http"}`)
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
// Server represents HTTP OpenTSDB server.
|
||||||
httpServer *http.Server
|
type Server struct {
|
||||||
httpAddr string
|
s *http.Server
|
||||||
maxRequestSize int64
|
ln net.Listener
|
||||||
)
|
wg sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
// Serve starts HTTP OpenTSDB server on the given addr.
|
// MustStart starts HTTP OpenTSDB server on the given addr.
|
||||||
func Serve(addr string, maxReqSize int64) {
|
//
|
||||||
|
// MustStop must be called on the returned server when it is no longer needed.
|
||||||
|
func MustStart(addr string, maxRequestSize int64) *Server {
|
||||||
logger.Infof("starting HTTP OpenTSDB server at %q", addr)
|
logger.Infof("starting HTTP OpenTSDB server at %q", addr)
|
||||||
httpAddr = addr
|
lnTCP, err := netutil.NewTCPListener("opentsdbhttp", addr)
|
||||||
maxRequestSize = maxReqSize
|
if err != nil {
|
||||||
httpServer = &http.Server{
|
logger.Fatalf("cannot start HTTP OpenTSDB collector at %q: %s", addr, err)
|
||||||
Addr: addr,
|
}
|
||||||
Handler: http.HandlerFunc(requestHandler),
|
return MustServe(lnTCP, maxRequestSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MustServe serves OpenTSDB HTTP put requests from ln with up to maxRequestSize size.
|
||||||
|
//
|
||||||
|
// MustStop must be called on the returned server when it is no longer needed.
|
||||||
|
func MustServe(ln net.Listener, maxRequestSize int64) *Server {
|
||||||
|
h := newRequestHandler(maxRequestSize)
|
||||||
|
hs := &http.Server{
|
||||||
|
Handler: h,
|
||||||
ReadTimeout: 30 * time.Second,
|
ReadTimeout: 30 * time.Second,
|
||||||
WriteTimeout: 10 * time.Second,
|
WriteTimeout: 10 * time.Second,
|
||||||
}
|
}
|
||||||
|
s := &Server{
|
||||||
|
s: hs,
|
||||||
|
ln: ln,
|
||||||
|
}
|
||||||
|
s.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
err := httpServer.ListenAndServe()
|
defer s.wg.Done()
|
||||||
|
err := s.s.Serve(s.ln)
|
||||||
if err == http.ErrServerClosed {
|
if err == http.ErrServerClosed {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatalf("error serving HTTP OpenTSDB: %s", err)
|
logger.Fatalf("error serving HTTP OpenTSDB at %q: %s", s.ln.Addr(), err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
// requestHandler handles HTTP OpenTSDB insert request.
|
// Wait waits until the server is stopped with MustStop.
|
||||||
func requestHandler(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) Wait() {
|
||||||
p, err := httpserver.ParsePath(r.URL.Path)
|
s.wg.Wait()
|
||||||
if err != nil {
|
|
||||||
httpserver.Errorf(w, "cannot parse path %q: %s", r.URL.Path, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if p.Prefix != "insert" {
|
|
||||||
// This is not our link.
|
|
||||||
httpserver.Errorf(w, "unexpected path requested on HTTP OpenTSDB server: %q", r.URL.Path)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
at, err := auth.NewToken(p.AuthToken)
|
|
||||||
if err != nil {
|
|
||||||
httpserver.Errorf(w, "auth error: %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
switch p.Suffix {
|
|
||||||
case "api/put":
|
|
||||||
writeRequests.Inc()
|
|
||||||
if err := insertHandler(at, r, maxRequestSize); err != nil {
|
|
||||||
writeErrors.Inc()
|
|
||||||
httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
w.WriteHeader(http.StatusNoContent)
|
|
||||||
default:
|
|
||||||
httpserver.Errorf(w, "unexpected path requested on HTTP OpenTSDB server: %q", r.URL.Path)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop stops HTTP OpenTSDB server.
|
// MustStop stops HTTP OpenTSDB server.
|
||||||
func Stop() {
|
func (s *Server) MustStop() {
|
||||||
logger.Infof("stopping HTTP OpenTSDB server at %q...", httpAddr)
|
logger.Infof("stopping HTTP OpenTSDB server at %q...", s.ln.Addr())
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
if err := httpServer.Shutdown(ctx); err != nil {
|
if err := s.s.Shutdown(ctx); err != nil {
|
||||||
logger.Fatalf("cannot close HTTP OpenTSDB server: %s", err)
|
logger.Fatalf("cannot close HTTP OpenTSDB server at %q: %s", s.ln.Addr(), err)
|
||||||
}
|
}
|
||||||
|
s.wg.Wait()
|
||||||
|
logger.Infof("OpenTSDB HTTP server at %q has been stopped", s.ln.Addr())
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRequestHandler(maxRequestSize int64) http.Handler {
|
||||||
|
rh := func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
p, err := httpserver.ParsePath(r.URL.Path)
|
||||||
|
if err != nil {
|
||||||
|
httpserver.Errorf(w, "cannot parse path %q: %s", r.URL.Path, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if p.Prefix != "insert" {
|
||||||
|
// This is not our link.
|
||||||
|
httpserver.Errorf(w, "unexpected path requested on HTTP OpenTSDB server: %q", r.URL.Path)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
at, err := auth.NewToken(p.AuthToken)
|
||||||
|
if err != nil {
|
||||||
|
httpserver.Errorf(w, "auth error: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
switch p.Suffix {
|
||||||
|
case "api/put", "opentsdb/api/put":
|
||||||
|
writeRequests.Inc()
|
||||||
|
if err := insertHandler(at, r, maxRequestSize); err != nil {
|
||||||
|
writeErrors.Inc()
|
||||||
|
httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
default:
|
||||||
|
httpserver.Errorf(w, "unexpected path requested on HTTP OpenTSDB server: %q", r.URL.Path)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return http.HandlerFunc(rh)
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -72,6 +73,8 @@ func (ln *TCPListener) Accept() (net.Conn, error) {
|
||||||
ln.accepts.Inc()
|
ln.accepts.Inc()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if ne, ok := err.(net.Error); ok && ne.Temporary() {
|
if ne, ok := err.(net.Error); ok && ne.Temporary() {
|
||||||
|
logger.Errorf("temporary error when listening for TCP addr %q: %s", ln.Addr(), err)
|
||||||
|
time.Sleep(time.Second)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ln.acceptErrors.Inc()
|
ln.acceptErrors.Inc()
|
||||||
|
|
Loading…
Reference in a new issue