[cluster/vminsert]:add reload -relabelConfig on the request to /-/reload (#3923)

When I use vminsert's `relabelConfig`, I found that now there is no
reloaded api. However, `vminsert` under `VM-Single` has it. So, I hope
to add it to the `cluster/vminster`.

---------

Signed-off-by: z-anshun <1179798460@qq.com>
Co-authored-by: Nikolay <nik@victoriametrics.com>
This commit is contained in:
noodles2hg 2024-06-11 01:36:41 +08:00 committed by GitHub
parent 37a8cc0b12
commit 77f22fdb8d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 70 additions and 10 deletions

View file

@ -187,6 +187,8 @@ func main() {
netstorage.MustStop() netstorage.MustStop()
logger.Infof("successfully stopped netstorage in %.3f seconds", time.Since(startTime).Seconds()) logger.Infof("successfully stopped netstorage in %.3f seconds", time.Since(startTime).Seconds())
relabel.Stop()
fs.MustStopDirRemover() fs.MustStopDirRemover()
logger.Infof("the vminsert has been stopped") logger.Infof("the vminsert has been stopped")
@ -384,6 +386,10 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, `{}`) fmt.Fprintf(w, `{}`)
return true return true
case "/-/reload":
procutil.SelfSIGHUP()
w.WriteHeader(http.StatusNoContent)
return true
default: default:
// This is not our link // This is not our link
return false return false

View file

@ -3,7 +3,9 @@ package relabel
import ( import (
"flag" "flag"
"fmt" "fmt"
"sync"
"sync/atomic" "sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -22,6 +24,9 @@ var (
usePromCompatibleNaming = flag.Bool("usePromCompatibleNaming", false, "Whether to replace characters unsupported by Prometheus with underscores "+ usePromCompatibleNaming = flag.Bool("usePromCompatibleNaming", false, "Whether to replace characters unsupported by Prometheus with underscores "+
"in the ingested metric names and label names. For example, foo.bar{a.b='c'} is transformed into foo_bar{a_b='c'} during data ingestion if this flag is set. "+ "in the ingested metric names and label names. For example, foo.bar{a.b='c'} is transformed into foo_bar{a_b='c'} during data ingestion if this flag is set. "+
"See https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels") "See https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels")
relabelConfigCheckInterval = flag.Duration("relabelConfigCheckInterval", 0, "Interval for checking for changes in '-relabelConfig' file. "+
"By default the checking is disabled. Send SIGHUP signal in order to force config check for changes")
) )
// Init must be called after flag.Parse and before using the relabel package. // Init must be called after flag.Parse and before using the relabel package.
@ -42,18 +47,53 @@ func Init() {
if len(*relabelConfig) == 0 { if len(*relabelConfig) == 0 {
return return
} }
globalStopChan = make(chan struct{})
relabelWG.Add(1)
go func() { go func() {
for range sighupCh { defer relabelWG.Done()
configReloads.Inc() var tickerCh <-chan time.Time
logger.Infof("received SIGHUP; reloading -relabelConfig=%q...", *relabelConfig) if *relabelConfigCheckInterval > 0 {
pcs, err := loadRelabelConfig() ticker := time.NewTicker(*relabelConfigCheckInterval)
if err != nil { tickerCh = ticker.C
configReloadErrors.Inc() defer ticker.Stop()
configSuccess.Set(0) }
logger.Errorf("cannot load the updated relabelConfig: %s; preserving the previous config", err) for {
continue select {
case <-sighupCh:
configReloads.Inc()
logger.Infof("received SIGHUP; reloading -relabelConfig=%q...", *relabelConfig)
pcsNew, err := loadRelabelConfig()
if err != nil {
configReloadErrors.Inc()
configSuccess.Set(0)
logger.Errorf("cannot load the updated relabelConfig: %s; preserving the previous config", err)
continue
}
if pcsNew.String() == pcs.String() {
logger.Infof("nothing changed in %q", relabelConfig)
continue
}
pcs = pcsNew
pcsGlobal.Store(pcsNew)
case <-tickerCh:
pcsNew, err := loadRelabelConfig()
if err != nil {
configReloadErrors.Inc()
configSuccess.Set(0)
logger.Errorf("cannot load the updated relabelConfig: %s; preserving the previous config", err)
continue
}
if pcsNew.String() == pcs.String() {
continue
}
pcs = pcsNew
pcsGlobal.Store(pcsNew)
case <-globalStopChan:
logger.Infof("stopping relabel")
return
} }
pcsGlobal.Store(pcs)
configSuccess.Set(1) configSuccess.Set(1)
configTimestamp.Set(fasttime.UnixTimestamp()) configTimestamp.Set(fasttime.UnixTimestamp())
logger.Infof("successfully reloaded -relabelConfig=%q", *relabelConfig) logger.Infof("successfully reloaded -relabelConfig=%q", *relabelConfig)
@ -61,6 +101,20 @@ func Init() {
}() }()
} }
func Stop() {
if len(*relabelConfig) == 0 {
return
}
close(globalStopChan)
relabelWG.Wait()
}
var (
globalStopChan chan struct{}
relabelWG sync.WaitGroup
)
var ( var (
configReloads = metrics.NewCounter(`vm_relabel_config_reloads_total`) configReloads = metrics.NewCounter(`vm_relabel_config_reloads_total`)
configReloadErrors = metrics.NewCounter(`vm_relabel_config_reloads_errors_total`) configReloadErrors = metrics.NewCounter(`vm_relabel_config_reloads_errors_total`)