2019-05-22 21:16:55 +00:00
package vminsert
import (
"flag"
"fmt"
"net/http"
"strings"
2019-05-29 09:35:47 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/concurrencylimiter"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb"
2019-08-22 09:27:18 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prometheus"
2019-12-09 18:58:19 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/vmimport"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
2019-08-23 05:45:11 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/metrics"
)
var (
2019-12-13 22:29:14 +00:00
graphiteListenAddr = flag . String ( "graphiteListenAddr" , "" , "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty" )
opentsdbListenAddr = flag . String ( "opentsdbListenAddr" , "" , "TCP and UDP address to listen for OpentTSDB metrics. " +
"Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. " +
"Usually :4242 must be set. Doesn't work if empty" )
2019-08-22 09:27:18 +00:00
opentsdbHTTPListenAddr = flag . String ( "opentsdbHTTPListenAddr" , "" , "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty" )
maxInsertRequestSize = flag . Int ( "maxInsertRequestSize" , 32 * 1024 * 1024 , "The maximum size of a single insert request in bytes" )
2019-08-23 05:45:11 +00:00
maxLabelsPerTimeseries = flag . Int ( "maxLabelsPerTimeseries" , 30 , "The maximum number of labels accepted per time series. Superflouos labels are dropped" )
2019-05-22 21:16:55 +00:00
)
2019-12-13 22:29:14 +00:00
var (
graphiteServer * graphite . Server
opentsdbServer * opentsdb . Server
opentsdbhttpServer * opentsdbhttp . Server
)
2019-05-22 21:16:55 +00:00
// Init initializes vminsert.
func Init ( ) {
2019-08-23 05:45:11 +00:00
storage . SetMaxLabelsPerTimeseries ( * maxLabelsPerTimeseries )
2019-05-29 09:35:47 +00:00
concurrencylimiter . Init ( )
2019-05-22 21:16:55 +00:00
if len ( * graphiteListenAddr ) > 0 {
2019-12-13 22:29:14 +00:00
graphiteServer = graphite . MustStart ( * graphiteListenAddr )
2019-05-22 21:16:55 +00:00
}
if len ( * opentsdbListenAddr ) > 0 {
2019-12-13 22:29:14 +00:00
opentsdbServer = opentsdb . MustStart ( * opentsdbListenAddr , int64 ( * maxInsertRequestSize ) )
2019-05-22 21:16:55 +00:00
}
2019-08-22 09:27:18 +00:00
if len ( * opentsdbHTTPListenAddr ) > 0 {
2019-12-13 22:29:14 +00:00
opentsdbhttpServer = opentsdbhttp . MustStart ( * opentsdbHTTPListenAddr , int64 ( * maxInsertRequestSize ) )
2019-08-22 09:27:18 +00:00
}
2019-05-22 21:16:55 +00:00
}
// Stop stops vminsert.
func Stop ( ) {
if len ( * graphiteListenAddr ) > 0 {
2019-12-13 22:29:14 +00:00
graphiteServer . MustStop ( )
2019-05-22 21:16:55 +00:00
}
if len ( * opentsdbListenAddr ) > 0 {
2019-12-13 22:29:14 +00:00
opentsdbServer . MustStop ( )
2019-05-22 21:16:55 +00:00
}
2019-08-22 09:27:18 +00:00
if len ( * opentsdbHTTPListenAddr ) > 0 {
2019-12-13 22:29:14 +00:00
opentsdbhttpServer . MustStop ( )
2019-08-22 09:27:18 +00:00
}
2019-05-22 21:16:55 +00:00
}
// RequestHandler is a handler for Prometheus remote storage write API
func RequestHandler ( w http . ResponseWriter , r * http . Request ) bool {
path := strings . Replace ( r . URL . Path , "//" , "/" , - 1 )
switch path {
case "/api/v1/write" :
prometheusWriteRequests . Inc ( )
if err := prometheus . InsertHandler ( r , int64 ( * maxInsertRequestSize ) ) ; err != nil {
prometheusWriteErrors . Inc ( )
httpserver . Errorf ( w , "error in %q: %s" , r . URL . Path , err )
return true
}
w . WriteHeader ( http . StatusNoContent )
return true
2019-12-09 18:58:19 +00:00
case "/api/v1/import" :
vmimportRequests . Inc ( )
if err := vmimport . InsertHandler ( r ) ; err != nil {
vmimportErrors . Inc ( )
httpserver . Errorf ( w , "error in %q: %s" , r . URL . Path , err )
return true
}
return true
2019-05-22 21:16:55 +00:00
case "/write" , "/api/v2/write" :
influxWriteRequests . Inc ( )
if err := influx . InsertHandler ( r ) ; err != nil {
influxWriteErrors . Inc ( )
httpserver . Errorf ( w , "error in %q: %s" , r . URL . Path , err )
return true
}
w . WriteHeader ( http . StatusNoContent )
return true
case "/query" :
2019-06-03 15:37:59 +00:00
// Emulate fake response for influx query.
// This is required for TSBS benchmark.
2019-05-22 21:16:55 +00:00
influxQueryRequests . Inc ( )
fmt . Fprintf ( w , ` { "results":[ { "series":[ { "values":[]}]}]} ` )
return true
default :
// This is not our link
return false
}
}
var (
prometheusWriteRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/api/v1/write", protocol="prometheus"} ` )
prometheusWriteErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/api/v1/write", protocol="prometheus"} ` )
2019-12-09 18:58:19 +00:00
vmimportRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/api/v1/import", protocol="vm"} ` )
vmimportErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/api/v1/import", protocol="vm"} ` )
2019-05-22 21:16:55 +00:00
influxWriteRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/write", protocol="influx"} ` )
influxWriteErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/write", protocol="influx"} ` )
influxQueryRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/query", protocol="influx"} ` )
)