VictoriaMetrics/app/vminsert/relabel/relabel.go
Aliaksandr Valialkin dad13c0a91
lib/streamaggr: follow-up for ff72ca14b9
- Make sure that the last successfully loaded config is used on hot-reload failure
- Properly cleanup resources occupied by already initialized aggregators
  when the current aggregator fails to be initialized
- Expose distinct vmagent_streamaggr_config_reload* metrics per each -remoteWrite.streamAggr.config
  This should simplify monitoring and debugging failed reloads
- Remove race condition at app/vminsert/common.MustStopStreamAggr when calling sa.MustStop() while sa
  could be in use at realoadSaConfig()
- Remove lib/streamaggr.aggregator.hasState global variable, since it may negatively impact scalability
  on system with big number of CPU cores at hasState.Store(true) call inside aggregator.Push().
- Remove fine-grained aggregator reload - reload all the aggregators on config change instead.
  This simplifies the code a bit. The fine-grained aggregator reload may be returned back
  if there will be demand from real users for it.
- Check -relabelConfig and -streamAggr.config files when single-node VictoriaMetrics runs with -dryRun flag
- Return back accidentally removed changelog for v1.87.4 at docs/CHANGELOG.md

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3639
2023-03-31 22:54:10 -07:00

171 lines
5.3 KiB
Go

package relabel
import (
"flag"
"fmt"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/metrics"
)
var (
relabelConfig = flag.String("relabelConfig", "", "Optional path to a file with relabeling rules, which are applied to all the ingested metrics. "+
"The path can point either to local file or to http url. "+
"See https://docs.victoriametrics.com/#relabeling for details. The config is reloaded on SIGHUP signal")
usePromCompatibleNaming = flag.Bool("usePromCompatibleNaming", false, "Whether to replace characters unsupported by Prometheus with underscores "+
"in the ingested metric names and label names. For example, foo.bar{a.b='c'} is transformed into foo_bar{a_b='c'} during data ingestion if this flag is set. "+
"See https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels")
)
// Init must be called after flag.Parse and before using the relabel package.
func Init() {
// Register SIGHUP handler for config re-read just before loadRelabelConfig call.
// This guarantees that the config will be re-read if the signal arrives during loadRelabelConfig call.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
sighupCh := procutil.NewSighupChan()
pcs, err := loadRelabelConfig()
if err != nil {
logger.Fatalf("cannot load relabelConfig: %s", err)
}
pcsGlobal.Store(pcs)
configSuccess.Set(1)
configTimestamp.Set(fasttime.UnixTimestamp())
if len(*relabelConfig) == 0 {
return
}
go func() {
for range sighupCh {
configReloads.Inc()
logger.Infof("received SIGHUP; reloading -relabelConfig=%q...", *relabelConfig)
pcs, err := loadRelabelConfig()
if err != nil {
configReloadErrors.Inc()
configSuccess.Set(0)
logger.Errorf("cannot load the updated relabelConfig: %s; preserving the previous config", err)
continue
}
pcsGlobal.Store(pcs)
configSuccess.Set(1)
configTimestamp.Set(fasttime.UnixTimestamp())
logger.Infof("successfully reloaded -relabelConfig=%q", *relabelConfig)
}
}()
}
var (
configReloads = metrics.NewCounter(`vm_relabel_config_reloads_total`)
configReloadErrors = metrics.NewCounter(`vm_relabel_config_reloads_errors_total`)
configSuccess = metrics.NewCounter(`vm_relabel_config_last_reload_successful`)
configTimestamp = metrics.NewCounter(`vm_relabel_config_last_reload_success_timestamp_seconds`)
)
var pcsGlobal atomic.Value
// CheckRelabelConfig checks config pointed by -relabelConfig
func CheckRelabelConfig() error {
_, err := loadRelabelConfig()
return err
}
func loadRelabelConfig() (*promrelabel.ParsedConfigs, error) {
if len(*relabelConfig) == 0 {
return nil, nil
}
pcs, err := promrelabel.LoadRelabelConfigs(*relabelConfig)
if err != nil {
return nil, fmt.Errorf("error when reading -relabelConfig=%q: %w", *relabelConfig, err)
}
return pcs, nil
}
// HasRelabeling returns true if there is global relabeling.
func HasRelabeling() bool {
pcs := pcsGlobal.Load().(*promrelabel.ParsedConfigs)
return pcs.Len() > 0 || *usePromCompatibleNaming
}
// Ctx holds relabeling context.
type Ctx struct {
// tmpLabels is used during ApplyRelabeling call.
tmpLabels []prompbmarshal.Label
}
// Reset resets ctx.
func (ctx *Ctx) Reset() {
promrelabel.CleanLabels(ctx.tmpLabels)
ctx.tmpLabels = ctx.tmpLabels[:0]
}
// ApplyRelabeling applies relabeling to the given labels and returns the result.
//
// The returned labels are valid until the next call to ApplyRelabeling.
func (ctx *Ctx) ApplyRelabeling(labels []prompb.Label) []prompb.Label {
pcs := pcsGlobal.Load().(*promrelabel.ParsedConfigs)
if pcs.Len() == 0 && !*usePromCompatibleNaming {
// There are no relabeling rules.
return labels
}
// Convert labels to prompbmarshal.Label format suitable for relabeling.
tmpLabels := ctx.tmpLabels[:0]
for _, label := range labels {
name := bytesutil.ToUnsafeString(label.Name)
if len(name) == 0 {
name = "__name__"
}
value := bytesutil.ToUnsafeString(label.Value)
tmpLabels = append(tmpLabels, prompbmarshal.Label{
Name: name,
Value: value,
})
}
if *usePromCompatibleNaming {
// Replace unsupported Prometheus chars in label names and metric names with underscores.
for i := range tmpLabels {
label := &tmpLabels[i]
if label.Name == "__name__" {
label.Value = promrelabel.SanitizeName(label.Value)
} else {
label.Name = promrelabel.SanitizeName(label.Name)
}
}
}
if pcs.Len() > 0 {
// Apply relabeling
tmpLabels = pcs.Apply(tmpLabels, 0)
tmpLabels = promrelabel.FinalizeLabels(tmpLabels[:0], tmpLabels)
if len(tmpLabels) == 0 {
metricsDropped.Inc()
}
}
ctx.tmpLabels = tmpLabels
// Return back labels to the desired format.
dst := labels[:0]
for _, label := range tmpLabels {
name := bytesutil.ToUnsafeBytes(label.Name)
if label.Name == "__name__" {
name = nil
}
value := bytesutil.ToUnsafeBytes(label.Value)
dst = append(dst, prompb.Label{
Name: name,
Value: value,
})
}
return dst
}
var metricsDropped = metrics.NewCounter(`vm_relabel_metrics_dropped_total`)