2023-01-05 22:03:58 +00:00
package nomad
import (
2023-01-18 05:47:11 +00:00
"context"
2023-01-05 22:03:58 +00:00
"flag"
"fmt"
2023-01-06 05:13:02 +00:00
"net/http"
2023-01-05 22:03:58 +00:00
"os"
"strconv"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
var waitTime = flag . Duration ( "promscrape.nomad.waitTime" , 0 , "Wait time used by Nomad service discovery. Default value is used if not set" )
// apiConfig contains config for API server.
type apiConfig struct {
tagSeparator string
nomadWatcher * nomadWatcher
}
func ( ac * apiConfig ) mustStop ( ) {
ac . nomadWatcher . mustStop ( )
}
var configMap = discoveryutils . NewConfigMap ( )
func getAPIConfig ( sdc * SDConfig , baseDir string ) ( * apiConfig , error ) {
v , err := configMap . Get ( sdc , func ( ) ( interface { } , error ) { return newAPIConfig ( sdc , baseDir ) } )
if err != nil {
return nil , err
}
return v . ( * apiConfig ) , nil
}
func newAPIConfig ( sdc * SDConfig , baseDir string ) ( * apiConfig , error ) {
hcc := sdc . HTTPClientConfig
2023-01-10 05:14:44 +00:00
token := os . Getenv ( "NOMAD_TOKEN" )
2023-01-05 22:03:58 +00:00
if token != "" {
if hcc . BearerToken != nil {
2023-01-10 05:14:44 +00:00
return nil , fmt . Errorf ( "cannot set both NOMAD_TOKEN and bearer_token" )
2023-01-05 22:03:58 +00:00
}
hcc . BearerToken = promauth . NewSecret ( token )
}
ac , err := hcc . NewConfig ( baseDir )
if err != nil {
return nil , fmt . Errorf ( "cannot parse auth config: %w" , err )
}
apiServer := sdc . Server
if apiServer == "" {
2023-01-10 05:14:44 +00:00
apiServer = os . Getenv ( "NOMAD_ADDR" )
if apiServer == "" {
apiServer = "localhost:4646"
}
2023-01-05 22:03:58 +00:00
}
if ! strings . Contains ( apiServer , "://" ) {
2023-01-10 05:14:44 +00:00
scheme := "http"
if hcc . TLSConfig != nil {
scheme = "https"
2023-01-05 22:03:58 +00:00
}
apiServer = scheme + "://" + apiServer
}
proxyAC , err := sdc . ProxyClientConfig . NewConfig ( baseDir )
if err != nil {
return nil , fmt . Errorf ( "cannot parse proxy auth config: %w" , err )
}
2023-07-06 22:59:56 +00:00
client , err := discoveryutils . NewClient ( apiServer , ac , sdc . ProxyURL , proxyAC , & sdc . HTTPClientConfig )
2023-01-05 22:03:58 +00:00
if err != nil {
return nil , fmt . Errorf ( "cannot create HTTP client for %q: %w" , apiServer , err )
}
tagSeparator := ","
if sdc . TagSeparator != nil {
tagSeparator = * sdc . TagSeparator
}
namespace := sdc . Namespace
if namespace == "" {
namespace = os . Getenv ( "NOMAD_NAMESPACE" )
}
2023-01-10 05:14:44 +00:00
region := sdc . Region
if region == "" {
region = os . Getenv ( "NOMAD_REGION" )
if region == "" {
region = "global"
}
}
nw := newNomadWatcher ( client , sdc , namespace , region )
2023-01-05 22:03:58 +00:00
cfg := & apiConfig {
tagSeparator : tagSeparator ,
nomadWatcher : nw ,
}
return cfg , nil
}
// maxWaitTime is duration for Nomad blocking request.
func maxWaitTime ( ) time . Duration {
d := discoveryutils . BlockingClientReadTimeout
// Nomad adds random delay up to wait/16, so reduce the timeout in order to keep it below BlockingClientReadTimeout.
// See https://developer.hashicorp.com/nomad/api-docs#blocking-queries
d -= d / 16
2023-02-13 12:27:13 +00:00
// The timeout cannot exceed 10 minutes. See https://developer.hashicorp.com/nomad/api-docs#blocking-queries
2023-01-05 22:03:58 +00:00
if d > 10 * time . Minute {
d = 10 * time . Minute
}
if * waitTime > time . Second && * waitTime < d {
d = * waitTime
}
return d
}
2023-02-13 12:27:13 +00:00
// getBlockingAPIResponse performs blocking request to Nomad via client and returns response.
2023-01-05 22:03:58 +00:00
// See https://developer.hashicorp.com/nomad/api-docs#blocking-queries .
2023-01-18 05:47:11 +00:00
func getBlockingAPIResponse ( ctx context . Context , client * discoveryutils . Client , path string , index int64 ) ( [ ] byte , int64 , error ) {
2023-01-05 22:03:58 +00:00
path += "&index=" + strconv . FormatInt ( index , 10 )
path += "&wait=" + fmt . Sprintf ( "%ds" , int ( maxWaitTime ( ) . Seconds ( ) ) )
2023-01-06 05:13:02 +00:00
getMeta := func ( resp * http . Response ) {
ind := resp . Header . Get ( "X-Nomad-Index" )
2023-01-05 22:03:58 +00:00
if len ( ind ) == 0 {
logger . Errorf ( "cannot find X-Nomad-Index header in response from %q" , path )
return
}
2023-01-06 05:13:02 +00:00
newIndex , err := strconv . ParseInt ( ind , 10 , 64 )
2023-01-05 22:03:58 +00:00
if err != nil {
logger . Errorf ( "cannot parse X-Nomad-Index header value in response from %q: %s" , path , err )
return
}
// Properly handle the returned newIndex according to https://developer.hashicorp.com/nomad/api-docs#blocking-queries.
// Index implementation details are the same for Consul and Nomad: https://developer.hashicorp.com/consul/api-docs/features/blocking#implementation-details
if newIndex < 1 {
index = 1
return
}
if index > newIndex {
index = 0
return
}
index = newIndex
}
2023-01-18 05:47:11 +00:00
data , err := client . GetBlockingAPIResponseCtx ( ctx , path , getMeta )
2023-01-05 22:03:58 +00:00
if err != nil {
return nil , index , fmt . Errorf ( "cannot perform blocking Nomad API request at %q: %w" , path , err )
}
return data , index , nil
}