2019-05-22 21:23:23 +00:00
package main
2019-05-22 21:16:55 +00:00
import (
"flag"
"fmt"
2020-02-23 11:35:47 +00:00
"io"
2019-05-22 21:16:55 +00:00
"net/http"
2019-05-22 21:23:23 +00:00
"time"
2019-05-22 21:16:55 +00:00
2020-03-10 17:35:58 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx"
2019-05-22 21:23:23 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb"
2019-08-22 09:27:18 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp"
2020-02-23 11:35:47 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/promremotewrite"
2019-12-09 18:58:19 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/vmimport"
2019-05-22 21:23:23 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
2020-02-10 11:26:18 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
2019-05-22 21:23:23 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2019-11-12 14:29:43 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
2020-02-23 11:35:47 +00:00
graphiteserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/graphite"
2020-02-25 17:09:46 +00:00
influxserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/influx"
2020-02-23 11:35:47 +00:00
opentsdbserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdb"
opentsdbhttpserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp"
2019-05-22 21:23:23 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
2019-08-23 05:45:11 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
2020-02-23 11:35:47 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/metrics"
)
var (
2019-12-13 22:29:14 +00:00
graphiteListenAddr = flag . String ( "graphiteListenAddr" , "" , "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty" )
2020-02-25 17:09:46 +00:00
influxListenAddr = flag . String ( "influxListenAddr" , "" , "TCP and UDP address to listen for Influx line protocol data. Usually :8189 must be set. Doesn't work if empty" )
2019-12-13 22:29:14 +00:00
opentsdbListenAddr = flag . String ( "opentsdbListenAddr" , "" , "TCP and UDP address to listen for OpentTSDB metrics. " +
"Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. " +
"Usually :4242 must be set. Doesn't work if empty" )
2019-08-22 09:27:18 +00:00
opentsdbHTTPListenAddr = flag . String ( "opentsdbHTTPListenAddr" , "" , "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty" )
httpListenAddr = flag . String ( "httpListenAddr" , ":8480" , "Address to listen for http connections" )
2019-08-23 05:45:11 +00:00
maxLabelsPerTimeseries = flag . Int ( "maxLabelsPerTimeseries" , 30 , "The maximum number of labels accepted per time series. Superflouos labels are dropped" )
2019-08-22 09:27:18 +00:00
storageNodes = flagutil . NewArray ( "storageNode" , "Address of vmstorage nodes; usage: -storageNode=vmstorage-host1:8400 -storageNode=vmstorage-host2:8400" )
2019-05-22 21:16:55 +00:00
)
2019-12-13 22:29:14 +00:00
var (
2020-02-25 17:09:46 +00:00
influxServer * influxserver . Server
2020-02-23 11:35:47 +00:00
graphiteServer * graphiteserver . Server
opentsdbServer * opentsdbserver . Server
opentsdbhttpServer * opentsdbhttpserver . Server
2019-12-13 22:29:14 +00:00
)
2019-05-22 21:23:23 +00:00
func main ( ) {
2020-02-10 11:26:18 +00:00
envflag . Parse ( )
2019-05-22 21:23:23 +00:00
buildinfo . Init ( )
logger . Init ( )
2019-07-20 07:21:59 +00:00
logger . Infof ( "initializing netstorage for storageNodes %s..." , * storageNodes )
2019-05-22 21:23:23 +00:00
startTime := time . Now ( )
2019-06-18 07:26:44 +00:00
if len ( * storageNodes ) == 0 {
2019-07-20 07:21:59 +00:00
logger . Fatalf ( "missing -storageNode arg" )
2019-05-22 21:23:23 +00:00
}
2019-06-18 07:26:44 +00:00
netstorage . InitStorageNodes ( * storageNodes )
2020-01-22 16:27:44 +00:00
logger . Infof ( "successfully initialized netstorage in %.3f seconds" , time . Since ( startTime ) . Seconds ( ) )
2019-05-22 21:23:23 +00:00
2019-08-23 05:45:11 +00:00
storage . SetMaxLabelsPerTimeseries ( * maxLabelsPerTimeseries )
2020-02-23 11:35:47 +00:00
writeconcurrencylimiter . Init ( )
2020-02-25 17:09:46 +00:00
if len ( * influxListenAddr ) > 0 {
influxServer = influxserver . MustStart ( * influxListenAddr , func ( r io . Reader ) error {
var at auth . Token // TODO: properly initialize auth token
return influx . InsertHandlerForReader ( & at , r )
} )
}
2019-05-22 21:16:55 +00:00
if len ( * graphiteListenAddr ) > 0 {
2020-02-23 11:35:47 +00:00
graphiteServer = graphiteserver . MustStart ( * graphiteListenAddr , func ( r io . Reader ) error {
var at auth . Token // TODO: properly initialize auth token
return graphite . InsertHandler ( & at , r )
} )
2019-05-22 21:16:55 +00:00
}
if len ( * opentsdbListenAddr ) > 0 {
2020-02-23 11:35:47 +00:00
opentsdbServer = opentsdbserver . MustStart ( * opentsdbListenAddr , func ( r io . Reader ) error {
var at auth . Token // TODO: properly initialize auth token
return opentsdb . InsertHandler ( & at , r )
} , opentsdbhttp . InsertHandler )
2019-05-22 21:16:55 +00:00
}
2019-08-22 09:27:18 +00:00
if len ( * opentsdbHTTPListenAddr ) > 0 {
2020-02-23 11:35:47 +00:00
opentsdbhttpServer = opentsdbhttpserver . MustStart ( * opentsdbHTTPListenAddr , opentsdbhttp . InsertHandler )
2019-08-22 09:27:18 +00:00
}
2019-05-22 21:16:55 +00:00
2019-05-22 21:23:23 +00:00
go func ( ) {
httpserver . Serve ( * httpListenAddr , requestHandler )
} ( )
sig := procutil . WaitForSigterm ( )
logger . Infof ( "service received signal %s" , sig )
logger . Infof ( "gracefully shutting down the service at %q" , * httpListenAddr )
startTime = time . Now ( )
if err := httpserver . Stop ( * httpListenAddr ) ; err != nil {
logger . Fatalf ( "cannot stop the service: %s" , err )
}
2020-01-22 16:27:44 +00:00
logger . Infof ( "successfully shut down the service in %.3f seconds" , time . Since ( startTime ) . Seconds ( ) )
2019-05-22 21:23:23 +00:00
2020-02-25 17:09:46 +00:00
if len ( * influxListenAddr ) > 0 {
influxServer . MustStop ( )
}
2019-05-22 21:16:55 +00:00
if len ( * graphiteListenAddr ) > 0 {
2019-12-13 22:29:14 +00:00
graphiteServer . MustStop ( )
2019-05-22 21:16:55 +00:00
}
if len ( * opentsdbListenAddr ) > 0 {
2019-12-13 22:29:14 +00:00
opentsdbServer . MustStop ( )
2019-05-22 21:16:55 +00:00
}
2019-08-22 09:27:18 +00:00
if len ( * opentsdbHTTPListenAddr ) > 0 {
2019-12-13 22:29:14 +00:00
opentsdbhttpServer . MustStop ( )
2019-08-22 09:27:18 +00:00
}
2019-05-22 21:23:23 +00:00
logger . Infof ( "shutting down neststorage..." )
startTime = time . Now ( )
netstorage . Stop ( )
2020-01-22 16:27:44 +00:00
logger . Infof ( "successfully stopped netstorage in %.3f seconds" , time . Since ( startTime ) . Seconds ( ) )
2019-05-22 21:23:23 +00:00
2019-11-12 14:29:43 +00:00
fs . MustStopDirRemover ( )
2019-05-22 21:23:23 +00:00
logger . Infof ( "the vminsert has been stopped" )
2019-05-22 21:16:55 +00:00
}
2019-05-22 21:23:23 +00:00
func requestHandler ( w http . ResponseWriter , r * http . Request ) bool {
p , err := httpserver . ParsePath ( r . URL . Path )
if err != nil {
httpserver . Errorf ( w , "cannot parse path %q: %s" , r . URL . Path , err )
return true
}
if p . Prefix != "insert" {
// This is not our link.
return false
}
at , err := auth . NewToken ( p . AuthToken )
if err != nil {
httpserver . Errorf ( w , "auth error: %s" , err )
return true
}
switch p . Suffix {
2019-06-03 15:17:25 +00:00
case "prometheus/" , "prometheus" , "prometheus/api/v1/write" :
2019-05-22 21:16:55 +00:00
prometheusWriteRequests . Inc ( )
2020-02-23 11:35:47 +00:00
if err := promremotewrite . InsertHandler ( at , r ) ; err != nil {
2019-05-22 21:16:55 +00:00
prometheusWriteErrors . Inc ( )
httpserver . Errorf ( w , "error in %q: %s" , r . URL . Path , err )
return true
}
w . WriteHeader ( http . StatusNoContent )
return true
2019-12-09 18:58:19 +00:00
case "prometheus/api/v1/import" :
vmimportRequests . Inc ( )
if err := vmimport . InsertHandler ( at , r ) ; err != nil {
vmimportErrors . Inc ( )
httpserver . Errorf ( w , "error in %q: %s" , r . URL . Path , err )
return true
}
2019-12-18 23:21:49 +00:00
w . WriteHeader ( http . StatusNoContent )
2019-12-09 18:58:19 +00:00
return true
2020-03-10 17:35:58 +00:00
case "prometheus/api/v1/import/csv" :
csvimportRequests . Inc ( )
if err := csvimport . InsertHandler ( at , r ) ; err != nil {
csvimportErrors . Inc ( )
httpserver . Errorf ( w , "error in %q: %s" , r . URL . Path , err )
return true
}
w . WriteHeader ( http . StatusNoContent )
return true
2019-05-22 21:23:23 +00:00
case "influx/write" , "influx/api/v2/write" :
2019-05-22 21:16:55 +00:00
influxWriteRequests . Inc ( )
2020-02-25 17:09:46 +00:00
if err := influx . InsertHandlerForHTTP ( at , r ) ; err != nil {
2019-05-22 21:16:55 +00:00
influxWriteErrors . Inc ( )
httpserver . Errorf ( w , "error in %q: %s" , r . URL . Path , err )
return true
}
w . WriteHeader ( http . StatusNoContent )
return true
2019-05-22 21:23:23 +00:00
case "influx/query" :
2019-06-03 15:37:59 +00:00
// Emulate fake response for influx query.
// This is required for TSBS benchmark.
2019-05-22 21:16:55 +00:00
influxQueryRequests . Inc ( )
fmt . Fprintf ( w , ` { "results":[ { "series":[ { "values":[]}]}]} ` )
return true
default :
// This is not our link
return false
}
}
var (
2020-03-10 17:35:58 +00:00
prometheusWriteRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/insert/ { }/prometheus/", protocol="promremotewrite"} ` )
prometheusWriteErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/insert/ { }/prometheus/", protocol="promremotewrite"} ` )
vmimportRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/insert/ { }/prometheus/api/v1/import", protocol="vmimport"} ` )
vmimportErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/insert/ { }/prometheus/api/v1/import", protocol="vmimport"} ` )
2019-05-22 21:16:55 +00:00
2020-03-10 17:35:58 +00:00
csvimportRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/insert/ { }/prometheus/api/v1/import/csv", protocol="csvimport"} ` )
csvimportErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/insert/ { }/prometheus/api/v1/import/csv", protocol="csvimport"} ` )
2019-12-09 18:58:19 +00:00
2019-05-22 21:23:23 +00:00
influxWriteRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/insert/ { }/influx/", protocol="influx"} ` )
influxWriteErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/insert/ { }/influx/", protocol="influx"} ` )
2019-05-22 21:16:55 +00:00
2019-05-22 21:23:23 +00:00
influxQueryRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/insert/ { }/influx/query", protocol="influx"} ` )
2019-05-22 21:16:55 +00:00
)