mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-11 14:53:49 +00:00
85de94e85c
* lib/protoparser: adds opentelemetry parser app/{vmagent,vminsert}: adds opentelemetry ingestion path Adds ability to ingest data with opentelemetry protocol protobuf and json encoding is supported data converted into prometheus protobuf timeseries each data type has own converter and it may produce multiple timeseries from single datapoint (for summary and histogram). only cumulative aggregationFamily is supported for sum(prometheus counter) and histogram. Apply suggestions from code review Co-authored-by: Roman Khavronenko <roman@victoriametrics.com> updates deps fixes tests wip wip wip wip lib/protoparser/opentelemetry: moves to vtprotobuf generator go mod vendor lib/protoparse/opentelemetry: reduce memory allocations * wip - Remove support for JSON parsing, since it is too fragile and is rarely used in practice. The most clients send OpenTelemetry metrics in protobuf. The JSON parser can be added in the future if needed. - Remove unused code from lib/protoparser/opentelemetry/pb and lib/protoparser/opentelemetry/proto - Do not re-use protobuf message between ParseStream() calls, since there is high chance of high fragmentation of the re-used message because of too complex nested structure of the message. * wip * wip * wip --------- Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
421 lines
18 KiB
Go
421 lines
18 KiB
Go
package main
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/clusternative"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadog"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/native"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentelemetry"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prometheusimport"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/promremotewrite"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/vmimport"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/influxutils"
|
|
clusternativeserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/clusternative"
|
|
graphiteserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/graphite"
|
|
influxserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/influx"
|
|
opentsdbserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdb"
|
|
opentsdbhttpserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
var (
|
|
clusternativeListenAddr = flag.String("clusternativeListenAddr", "", "TCP address to listen for data from other vminsert nodes in multi-level cluster setup. "+
|
|
"See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multi-level-cluster-setup . Usually :8400 should be set to match default vmstorage port for vminsert. Disabled work if empty")
|
|
graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty. "+
|
|
"See also -graphiteListenAddr.useProxyProtocol")
|
|
graphiteUseProxyProtocol = flag.Bool("graphiteListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -graphiteListenAddr . "+
|
|
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
|
|
influxListenAddr = flag.String("influxListenAddr", "", "TCP and UDP address to listen for InfluxDB line protocol data. Usually :8089 must be set. Doesn't work if empty. "+
|
|
"This flag isn't needed when ingesting data over HTTP - just send it to http://<victoriametrics>:8428/write . "+
|
|
"See also -influxListenAddr.useProxyProtocol")
|
|
influxUseProxyProtocol = flag.Bool("influxListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -influxListenAddr . "+
|
|
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
|
|
opentsdbListenAddr = flag.String("opentsdbListenAddr", "", "TCP and UDP address to listen for OpenTSDB metrics. "+
|
|
"Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. "+
|
|
"Usually :4242 must be set. Doesn't work if empty. "+
|
|
"See also -opentsdbListenAddr.useProxyProtocol")
|
|
opentsdbUseProxyProtocol = flag.Bool("opentsdbListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -opentsdbListenAddr . "+
|
|
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
|
|
opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpenTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty. "+
|
|
"See also -opentsdbHTTPListenAddr.useProxyProtocol")
|
|
opentsdbHTTPUseProxyProtocol = flag.Bool("opentsdbHTTPListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted "+
|
|
"at -opentsdbHTTPListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
|
|
httpListenAddr = flag.String("httpListenAddr", ":8480", "Address to listen for http connections. See also -httpListenAddr.useProxyProtocol")
|
|
useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+
|
|
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt . "+
|
|
"With enabled proxy protocol http server cannot serve regular /metrics endpoint. Use -pushmetrics.url for metrics pushing")
|
|
maxLabelsPerTimeseries = flag.Int("maxLabelsPerTimeseries", 30, "The maximum number of labels accepted per time series. Superfluous labels are dropped. In this case the vm_metrics_with_dropped_labels_total metric at /metrics page is incremented")
|
|
maxLabelValueLen = flag.Int("maxLabelValueLen", 16*1024, "The maximum length of label values in the accepted time series. Longer label values are truncated. In this case the vm_too_long_label_values_total metric at /metrics page is incremented")
|
|
storageNodes = flagutil.NewArrayString("storageNode", "Comma-separated addresses of vmstorage nodes; usage: -storageNode=vmstorage-host1,...,vmstorage-hostN . "+
|
|
"Enterprise version of VictoriaMetrics supports automatic discovery of vmstorage addresses via dns+srv records. For example, -storageNode=dns+srv:vmstorage.addrs . "+
|
|
"See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#automatic-vmstorage-discovery")
|
|
)
|
|
|
|
var (
|
|
clusternativeServer *clusternativeserver.Server
|
|
graphiteServer *graphiteserver.Server
|
|
influxServer *influxserver.Server
|
|
opentsdbServer *opentsdbserver.Server
|
|
opentsdbhttpServer *opentsdbhttpserver.Server
|
|
)
|
|
|
|
func main() {
|
|
// Write flags and help message to stdout, since it is easier to grep or pipe.
|
|
flag.CommandLine.SetOutput(os.Stdout)
|
|
flag.Usage = usage
|
|
envflag.Parse()
|
|
buildinfo.Init()
|
|
logger.Init()
|
|
pushmetrics.Init()
|
|
|
|
logger.Infof("initializing netstorage for storageNodes %s...", *storageNodes)
|
|
startTime := time.Now()
|
|
if len(*storageNodes) == 0 {
|
|
logger.Fatalf("missing -storageNode arg")
|
|
}
|
|
if hasEmptyValues(*storageNodes) {
|
|
logger.Fatalf("found empty address of storage node in the -storageNodes flag, please make sure that all -storageNode args are non-empty")
|
|
}
|
|
if duplicatedAddr := checkDuplicates(*storageNodes); duplicatedAddr != "" {
|
|
logger.Fatalf("found equal addresses of storage nodes in the -storageNodes flag: %q", duplicatedAddr)
|
|
}
|
|
hashSeed := uint64(0)
|
|
if *clusternativeListenAddr != "" {
|
|
// Use different hash seed for the second level of vminsert nodes in multi-level cluster setup.
|
|
// This should fix uneven distribution of time series among storage nodes.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1672
|
|
hashSeed = 0xabcdef0123456789
|
|
}
|
|
netstorage.Init(*storageNodes, hashSeed)
|
|
logger.Infof("successfully initialized netstorage in %.3f seconds", time.Since(startTime).Seconds())
|
|
|
|
relabel.Init()
|
|
storage.SetMaxLabelsPerTimeseries(*maxLabelsPerTimeseries)
|
|
storage.SetMaxLabelValueLen(*maxLabelValueLen)
|
|
common.StartUnmarshalWorkers()
|
|
if len(*clusternativeListenAddr) > 0 {
|
|
clusternativeServer = clusternativeserver.MustStart(*clusternativeListenAddr, func(c net.Conn) error {
|
|
return clusternative.InsertHandler(c)
|
|
})
|
|
}
|
|
if len(*graphiteListenAddr) > 0 {
|
|
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, *graphiteUseProxyProtocol, func(r io.Reader) error {
|
|
return graphite.InsertHandler(nil, r)
|
|
})
|
|
}
|
|
if len(*influxListenAddr) > 0 {
|
|
influxServer = influxserver.MustStart(*influxListenAddr, *influxUseProxyProtocol, func(r io.Reader) error {
|
|
return influx.InsertHandlerForReader(nil, r)
|
|
})
|
|
}
|
|
if len(*opentsdbListenAddr) > 0 {
|
|
opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, *opentsdbUseProxyProtocol, func(r io.Reader) error {
|
|
return opentsdb.InsertHandler(nil, r)
|
|
}, opentsdbhttp.InsertHandler)
|
|
}
|
|
if len(*opentsdbHTTPListenAddr) > 0 {
|
|
opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, *opentsdbHTTPUseProxyProtocol, opentsdbhttp.InsertHandler)
|
|
}
|
|
|
|
go func() {
|
|
httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
|
|
}()
|
|
|
|
sig := procutil.WaitForSigterm()
|
|
logger.Infof("service received signal %s", sig)
|
|
|
|
logger.Infof("gracefully shutting down http service at %q", *httpListenAddr)
|
|
startTime = time.Now()
|
|
if err := httpserver.Stop(*httpListenAddr); err != nil {
|
|
logger.Fatalf("cannot stop http service: %s", err)
|
|
}
|
|
logger.Infof("successfully shut down http service in %.3f seconds", time.Since(startTime).Seconds())
|
|
|
|
if len(*clusternativeListenAddr) > 0 {
|
|
clusternativeServer.MustStop()
|
|
}
|
|
if len(*graphiteListenAddr) > 0 {
|
|
graphiteServer.MustStop()
|
|
}
|
|
if len(*influxListenAddr) > 0 {
|
|
influxServer.MustStop()
|
|
}
|
|
if len(*opentsdbListenAddr) > 0 {
|
|
opentsdbServer.MustStop()
|
|
}
|
|
if len(*opentsdbHTTPListenAddr) > 0 {
|
|
opentsdbhttpServer.MustStop()
|
|
}
|
|
common.StopUnmarshalWorkers()
|
|
|
|
logger.Infof("shutting down neststorage...")
|
|
startTime = time.Now()
|
|
netstorage.MustStop()
|
|
logger.Infof("successfully stopped netstorage in %.3f seconds", time.Since(startTime).Seconds())
|
|
|
|
fs.MustStopDirRemover()
|
|
|
|
logger.Infof("the vminsert has been stopped")
|
|
}
|
|
|
|
func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
|
startTime := time.Now()
|
|
defer requestDuration.UpdateDuration(startTime)
|
|
|
|
if r.URL.Path == "/" {
|
|
if r.Method != http.MethodGet {
|
|
return false
|
|
}
|
|
w.Header().Add("Content-Type", "text/html; charset=utf-8")
|
|
fmt.Fprintf(w, `vminsert - a component of VictoriaMetrics cluster<br/>
|
|
<a href="https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html">docs</a><br>
|
|
`)
|
|
return true
|
|
}
|
|
p, err := httpserver.ParsePath(r.URL.Path)
|
|
if err != nil {
|
|
httpserver.Errorf(w, r, "cannot parse path %q: %s", r.URL.Path, err)
|
|
return true
|
|
}
|
|
if p.Prefix != "insert" {
|
|
// This is not our link.
|
|
return false
|
|
}
|
|
at, err := auth.NewToken(p.AuthToken)
|
|
if err != nil {
|
|
httpserver.Errorf(w, r, "auth error: %s", err)
|
|
return true
|
|
}
|
|
|
|
if strings.HasPrefix(p.Suffix, "prometheus/api/v1/import/prometheus") {
|
|
prometheusimportRequests.Inc()
|
|
if err := prometheusimport.InsertHandler(at, r); err != nil {
|
|
prometheusimportErrors.Inc()
|
|
httpserver.Errorf(w, r, "%s", err)
|
|
return true
|
|
}
|
|
statusCode := http.StatusNoContent
|
|
if strings.HasPrefix(p.Suffix, "prometheus/api/v1/import/prometheus/metrics/job/") {
|
|
// Return 200 status code for pushgateway requests.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3636
|
|
statusCode = http.StatusOK
|
|
}
|
|
w.WriteHeader(statusCode)
|
|
return true
|
|
}
|
|
if strings.HasPrefix(p.Suffix, "datadog/") {
|
|
// Trim suffix from paths starting from /datadog/ in order to support legacy DataDog agent.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2670
|
|
p.Suffix = strings.TrimSuffix(p.Suffix, "/")
|
|
}
|
|
switch p.Suffix {
|
|
case "prometheus/", "prometheus", "prometheus/api/v1/write":
|
|
if common.HandleVMProtoServerHandshake(w, r) {
|
|
return true
|
|
}
|
|
prometheusWriteRequests.Inc()
|
|
if err := promremotewrite.InsertHandler(at, r); err != nil {
|
|
prometheusWriteErrors.Inc()
|
|
httpserver.Errorf(w, r, "%s", err)
|
|
return true
|
|
}
|
|
w.WriteHeader(http.StatusNoContent)
|
|
return true
|
|
case "prometheus/api/v1/import":
|
|
vmimportRequests.Inc()
|
|
if err := vmimport.InsertHandler(at, r); err != nil {
|
|
vmimportErrors.Inc()
|
|
httpserver.Errorf(w, r, "%s", err)
|
|
return true
|
|
}
|
|
w.WriteHeader(http.StatusNoContent)
|
|
return true
|
|
case "prometheus/api/v1/import/csv":
|
|
csvimportRequests.Inc()
|
|
if err := csvimport.InsertHandler(at, r); err != nil {
|
|
csvimportErrors.Inc()
|
|
httpserver.Errorf(w, r, "%s", err)
|
|
return true
|
|
}
|
|
w.WriteHeader(http.StatusNoContent)
|
|
return true
|
|
case "prometheus/api/v1/import/native":
|
|
nativeimportRequests.Inc()
|
|
if err := native.InsertHandler(at, r); err != nil {
|
|
nativeimportErrors.Inc()
|
|
httpserver.Errorf(w, r, "%s", err)
|
|
return true
|
|
}
|
|
w.WriteHeader(http.StatusNoContent)
|
|
return true
|
|
case "influx/write", "influx/api/v2/write":
|
|
influxWriteRequests.Inc()
|
|
addInfluxResponseHeaders(w)
|
|
if err := influx.InsertHandlerForHTTP(at, r); err != nil {
|
|
influxWriteErrors.Inc()
|
|
httpserver.Errorf(w, r, "%s", err)
|
|
return true
|
|
}
|
|
w.WriteHeader(http.StatusNoContent)
|
|
return true
|
|
case "influx/query":
|
|
influxQueryRequests.Inc()
|
|
addInfluxResponseHeaders(w)
|
|
influxutils.WriteDatabaseNames(w)
|
|
return true
|
|
case "opentelemetry/api/v1/push":
|
|
opentelemetryPushRequests.Inc()
|
|
if err := opentelemetry.InsertHandler(at, r); err != nil {
|
|
opentelemetryPushErrors.Inc()
|
|
httpserver.Errorf(w, r, "%s", err)
|
|
return true
|
|
}
|
|
w.WriteHeader(http.StatusOK)
|
|
return true
|
|
case "datadog/api/v1/series":
|
|
datadogWriteRequests.Inc()
|
|
if err := datadog.InsertHandlerForHTTP(at, r); err != nil {
|
|
datadogWriteErrors.Inc()
|
|
httpserver.Errorf(w, r, "%s", err)
|
|
return true
|
|
}
|
|
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(202)
|
|
fmt.Fprintf(w, `{"status":"ok"}`)
|
|
return true
|
|
case "datadog/api/v1/validate":
|
|
datadogValidateRequests.Inc()
|
|
// See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key
|
|
w.Header().Set("Content-Type", "application/json")
|
|
fmt.Fprintf(w, `{"valid":true}`)
|
|
return true
|
|
case "datadog/api/v1/check_run":
|
|
datadogCheckRunRequests.Inc()
|
|
// See https://docs.datadoghq.com/api/latest/service-checks/#submit-a-service-check
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(202)
|
|
fmt.Fprintf(w, `{"status":"ok"}`)
|
|
return true
|
|
case "datadog/intake":
|
|
datadogIntakeRequests.Inc()
|
|
w.Header().Set("Content-Type", "application/json")
|
|
fmt.Fprintf(w, `{}`)
|
|
return true
|
|
case "datadog/api/v1/metadata":
|
|
datadogMetadataRequests.Inc()
|
|
w.Header().Set("Content-Type", "application/json")
|
|
fmt.Fprintf(w, `{}`)
|
|
return true
|
|
default:
|
|
// This is not our link
|
|
return false
|
|
}
|
|
}
|
|
|
|
func addInfluxResponseHeaders(w http.ResponseWriter) {
|
|
// This is needed for some clients, which expect InfluxDB version header.
|
|
// See, for example, https://github.com/ntop/ntopng/issues/5449#issuecomment-1005347597
|
|
w.Header().Set("X-Influxdb-Version", "1.8.0")
|
|
}
|
|
|
|
var (
|
|
requestDuration = metrics.NewHistogram(`vminsert_request_duration_seconds`)
|
|
|
|
prometheusWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/prometheus/", protocol="promremotewrite"}`)
|
|
prometheusWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/prometheus/", protocol="promremotewrite"}`)
|
|
|
|
vmimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/prometheus/api/v1/import", protocol="vmimport"}`)
|
|
vmimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/prometheus/api/v1/import", protocol="vmimport"}`)
|
|
|
|
csvimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/prometheus/api/v1/import/csv", protocol="csvimport"}`)
|
|
csvimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/prometheus/api/v1/import/csv", protocol="csvimport"}`)
|
|
|
|
prometheusimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/prometheus/api/v1/import/prometheus", protocol="prometheusimport"}`)
|
|
prometheusimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/prometheus/api/v1/import/prometheus", protocol="prometheusimport"}`)
|
|
|
|
nativeimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/prometheus/api/v1/import/native", protocol="nativeimport"}`)
|
|
nativeimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/prometheus/api/v1/import/native", protocol="nativeimport"}`)
|
|
|
|
influxWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/influx/write", protocol="influx"}`)
|
|
influxWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/influx/write", protocol="influx"}`)
|
|
|
|
influxQueryRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/influx/query", protocol="influx"}`)
|
|
|
|
opentelemetryPushRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/opentelemetry/api/v1/push", protocol="opentelemetry"}`)
|
|
opentelemetryPushErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/opentelemetry/api/v1/push", protocol="opentelemetry"}`)
|
|
|
|
datadogWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/datadog/api/v1/series", protocol="datadog"}`)
|
|
datadogWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/datadog/api/v1/series", protocol="datadog"}`)
|
|
|
|
datadogValidateRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/datadog/api/v1/validate", protocol="datadog"}`)
|
|
datadogCheckRunRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/datadog/api/v1/check_run", protocol="datadog"}`)
|
|
datadogIntakeRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/datadog/intake", protocol="datadog"}`)
|
|
datadogMetadataRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/datadog/api/v1/metadata", protocol="datadog"}`)
|
|
|
|
_ = metrics.NewGauge(`vm_metrics_with_dropped_labels_total`, func() float64 {
|
|
return float64(atomic.LoadUint64(&storage.MetricsWithDroppedLabels))
|
|
})
|
|
_ = metrics.NewGauge(`vm_too_long_label_names_total`, func() float64 {
|
|
return float64(atomic.LoadUint64(&storage.TooLongLabelNames))
|
|
})
|
|
_ = metrics.NewGauge(`vm_too_long_label_values_total`, func() float64 {
|
|
return float64(atomic.LoadUint64(&storage.TooLongLabelValues))
|
|
})
|
|
)
|
|
|
|
func usage() {
|
|
const s = `
|
|
vminsert accepts data via popular data ingestion protocols and routes it to vmstorage nodes configured via -storageNode.
|
|
|
|
See the docs at https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html .
|
|
`
|
|
flagutil.Usage(s)
|
|
}
|
|
|
|
func checkDuplicates(arr []string) string {
|
|
visited := make(map[string]struct{})
|
|
for _, s := range arr {
|
|
if _, ok := visited[s]; ok {
|
|
return s
|
|
}
|
|
visited[s] = struct{}{}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func hasEmptyValues(arr []string) bool {
|
|
for _, s := range arr {
|
|
if s == "" {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|