diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 29f037208..69182c609 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -187,6 +187,8 @@ func main() { netstorage.MustStop() logger.Infof("successfully stopped netstorage in %.3f seconds", time.Since(startTime).Seconds()) + relabel.Stop() + fs.MustStopDirRemover() logger.Infof("the vminsert has been stopped") @@ -384,6 +386,10 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { w.Header().Set("Content-Type", "application/json") fmt.Fprintf(w, `{}`) return true + case "/-/reload": + procutil.SelfSIGHUP() + w.WriteHeader(http.StatusNoContent) + return true default: // This is not our link return false diff --git a/app/vminsert/relabel/relabel.go b/app/vminsert/relabel/relabel.go index 17a969445..c644cf72d 100644 --- a/app/vminsert/relabel/relabel.go +++ b/app/vminsert/relabel/relabel.go @@ -3,7 +3,9 @@ package relabel import ( "flag" "fmt" + "sync" "sync/atomic" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -22,6 +24,9 @@ var ( usePromCompatibleNaming = flag.Bool("usePromCompatibleNaming", false, "Whether to replace characters unsupported by Prometheus with underscores "+ "in the ingested metric names and label names. For example, foo.bar{a.b='c'} is transformed into foo_bar{a_b='c'} during data ingestion if this flag is set. "+ "See https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels") + + relabelConfigCheckInterval = flag.Duration("relabelConfigCheckInterval", 0, "Interval for checking for changes in '-relabelConfig' file. "+ + "By default the checking is disabled. Send SIGHUP signal in order to force config check for changes") ) // Init must be called after flag.Parse and before using the relabel package. @@ -42,18 +47,53 @@ func Init() { if len(*relabelConfig) == 0 { return } + + globalStopChan = make(chan struct{}) + relabelWG.Add(1) go func() { - for range sighupCh { - configReloads.Inc() - logger.Infof("received SIGHUP; reloading -relabelConfig=%q...", *relabelConfig) - pcs, err := loadRelabelConfig() - if err != nil { - configReloadErrors.Inc() - configSuccess.Set(0) - logger.Errorf("cannot load the updated relabelConfig: %s; preserving the previous config", err) - continue + defer relabelWG.Done() + var tickerCh <-chan time.Time + if *relabelConfigCheckInterval > 0 { + ticker := time.NewTicker(*relabelConfigCheckInterval) + tickerCh = ticker.C + defer ticker.Stop() + } + for { + select { + case <-sighupCh: + configReloads.Inc() + logger.Infof("received SIGHUP; reloading -relabelConfig=%q...", *relabelConfig) + pcsNew, err := loadRelabelConfig() + if err != nil { + configReloadErrors.Inc() + configSuccess.Set(0) + logger.Errorf("cannot load the updated relabelConfig: %s; preserving the previous config", err) + continue + } + if pcsNew.String() == pcs.String() { + logger.Infof("nothing changed in %q", relabelConfig) + continue + } + pcs = pcsNew + pcsGlobal.Store(pcsNew) + case <-tickerCh: + pcsNew, err := loadRelabelConfig() + if err != nil { + configReloadErrors.Inc() + configSuccess.Set(0) + logger.Errorf("cannot load the updated relabelConfig: %s; preserving the previous config", err) + continue + } + if pcsNew.String() == pcs.String() { + continue + } + pcs = pcsNew + pcsGlobal.Store(pcsNew) + + case <-globalStopChan: + logger.Infof("stopping relabel") + return } - pcsGlobal.Store(pcs) configSuccess.Set(1) configTimestamp.Set(fasttime.UnixTimestamp()) logger.Infof("successfully reloaded -relabelConfig=%q", *relabelConfig) @@ -61,6 +101,20 @@ func Init() { }() } +func Stop() { + if len(*relabelConfig) == 0 { + return + } + + close(globalStopChan) + relabelWG.Wait() +} + +var ( + globalStopChan chan struct{} + relabelWG sync.WaitGroup +) + var ( configReloads = metrics.NewCounter(`vm_relabel_config_reloads_total`) configReloadErrors = metrics.NewCounter(`vm_relabel_config_reloads_errors_total`)