Merge branch 'public-single-node' into victorialogs-wip

This commit is contained in:
Aliaksandr Valialkin 2024-06-10 18:24:52 +02:00
commit 9715fcc1ac
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
59 changed files with 3916 additions and 489 deletions

View file

@ -2762,7 +2762,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-cacheExpireDuration duration
Items are removed from in-memory caches after they aren't accessed for this duration. Lower values may reduce memory usage at the cost of higher CPU usage. See also -prevCacheRemovalPercent (default 30m0s)
-configAuthKey value
Authorization key for accessing /config page. It must be passed via authKey query arg
Authorization key for accessing /config page. It must be passed via authKey query arg. It overrides httpAuth.* settings.
Flag value can be read from the given file when using -configAuthKey=file:///abs/path/to/file or -configAuthKey=file://./relative/path/to/file . Flag value can be read from the given http/https url when using -configAuthKey=http://host/path or -configAuthKey=https://host/path
-csvTrimTimestamp duration
Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms)
@ -3076,7 +3076,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-relabelConfig string
Optional path to a file with relabeling rules, which are applied to all the ingested metrics. The path can point either to local file or to http url. See https://docs.victoriametrics.com/#relabeling for details. The config is reloaded on SIGHUP signal
-reloadAuthKey value
Auth key for /-/reload http endpoint. It must be passed as authKey=...
Auth key for /-/reload http endpoint. It must be passed via authKey query arg. It overrides httpAuth.* settings.
Flag value can be read from the given file when using -reloadAuthKey=file:///abs/path/to/file or -reloadAuthKey=file://./relative/path/to/file . Flag value can be read from the given http/https url when using -reloadAuthKey=http://host/path or -reloadAuthKey=https://host/path
-retentionFilter array
Retention filter in the format 'filter:retention'. For example, '{env="dev"}:3d' configures the retention for time series with env="dev" label to 3 days. See https://docs.victoriametrics.com/#retention-filters for details. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise/

View file

@ -78,8 +78,8 @@ var (
"See also -opentsdbHTTPListenAddr.useProxyProtocol")
opentsdbHTTPUseProxyProtocol = flag.Bool("opentsdbHTTPListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted "+
"at -opentsdbHTTPListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
configAuthKey = flagutil.NewPassword("configAuthKey", "Authorization key for accessing /config page. It must be passed via authKey query arg")
reloadAuthKey = flagutil.NewPassword("reloadAuthKey", "Auth key for /-/reload http endpoint. It must be passed as authKey=...")
configAuthKey = flagutil.NewPassword("configAuthKey", "Authorization key for accessing /config page. It must be passed via authKey query arg. It overrides httpAuth.* settings.")
reloadAuthKey = flagutil.NewPassword("reloadAuthKey", "Auth key for /-/reload http endpoint. It must be passed via authKey query arg. It overrides httpAuth.* settings.")
dryRun = flag.Bool("dryRun", false, "Whether to check config files without running vmagent. The following files are checked: "+
"-promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig, -remoteWrite.streamAggr.config . "+
"Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag")

View file

@ -19,7 +19,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
)
var reloadAuthKey = flagutil.NewPassword("reloadAuthKey", "Auth key for /-/reload http endpoint. It must be passed as authKey=...")
var reloadAuthKey = flagutil.NewPassword("reloadAuthKey", "Auth key for /-/reload http endpoint. It must be passed via authKey query arg. It overrides httpAuth.* settings.")
var (
apiLinks = [][2]string{

View file

@ -37,13 +37,15 @@ var (
"With enabled proxy protocol http server cannot serve regular /metrics endpoint. Use -pushmetrics.url for metrics pushing")
maxIdleConnsPerBackend = flag.Int("maxIdleConnsPerBackend", 100, "The maximum number of idle connections vmauth can open per each backend host. "+
"See also -maxConcurrentRequests")
idleConnTimeout = flag.Duration("idleConnTimeout", 50*time.Second, `Defines a duration for idle (keep-alive connections) to exist.
Consider setting this value less than "-http.idleConnTimeout". It must prevent possible "write: broken pipe" and "read: connection reset by peer" errors.`)
responseTimeout = flag.Duration("responseTimeout", 5*time.Minute, "The timeout for receiving a response from backend")
maxConcurrentRequests = flag.Int("maxConcurrentRequests", 1000, "The maximum number of concurrent requests vmauth can process. Other requests are rejected with "+
"'429 Too Many Requests' http status code. See also -maxConcurrentPerUserRequests and -maxIdleConnsPerBackend command-line options")
maxConcurrentPerUserRequests = flag.Int("maxConcurrentPerUserRequests", 300, "The maximum number of concurrent requests vmauth can process per each configured user. "+
"Other requests are rejected with '429 Too Many Requests' http status code. See also -maxConcurrentRequests command-line option and max_concurrent_requests option "+
"in per-user config")
reloadAuthKey = flagutil.NewPassword("reloadAuthKey", "Auth key for /-/reload http endpoint. It must be passed as authKey=...")
reloadAuthKey = flagutil.NewPassword("reloadAuthKey", "Auth key for /-/reload http endpoint. It must be passed via authKey query arg. It overrides httpAuth.* settings.")
logInvalidAuthTokens = flag.Bool("logInvalidAuthTokens", false, "Whether to log requests with invalid auth tokens. "+
`Such requests are always counted at vmauth_http_request_errors_total{reason="invalid_auth_token"} metric, which is exposed at /metrics page`)
failTimeout = flag.Duration("failTimeout", 3*time.Second, "Sets a delay period for load balancing to skip a malfunctioning backend")
@ -199,10 +201,8 @@ func processRequest(w http.ResponseWriter, r *http.Request, ui *UserInfo) {
isDefault = true
}
maxAttempts := up.getBackendsCount()
if maxAttempts > 1 {
r.Body = &readTrackingBody{
r: r.Body,
}
r.Body = &readTrackingBody{
r: r.Body,
}
for i := 0; i < maxAttempts; i++ {
bu := up.getBackendURL()
@ -243,8 +243,10 @@ func tryProcessingRequest(w http.ResponseWriter, r *http.Request, targetURL *url
req.Host = targetURL.Host
}
updateHeadersByConfig(req.Header, hc.RequestHeaders)
res, err := ui.rt.RoundTrip(req)
var trivialRetries int
rtb, rtbOK := req.Body.(*readTrackingBody)
again:
res, err := ui.rt.RoundTrip(req)
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// Do not retry canceled or timed out requests
@ -267,6 +269,12 @@ func tryProcessingRequest(w http.ResponseWriter, r *http.Request, targetURL *url
ui.backendErrors.Inc()
return true
}
// one time retry trivial network errors, such as proxy idle timeout misconfiguration
// or socket close by OS
if (netutil.IsTrivialNetworkError(err) || errors.Is(err, io.EOF)) && trivialRetries < 1 {
trivialRetries++
goto again
}
// Retry the request if its body wasn't read yet. This usually means that the backend isn't reachable.
remoteAddr := httpserver.GetQuotedRemoteAddr(r)
// NOTE: do not use httpserver.GetRequestURI
@ -434,6 +442,7 @@ func newRoundTripper(caFileOpt, certFileOpt, keyFileOpt, serverNameOpt string, i
tr.ResponseHeaderTimeout = *responseTimeout
// Automatic compression must be disabled in order to fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/535
tr.DisableCompression = true
tr.IdleConnTimeout = *idleConnTimeout
tr.MaxIdleConnsPerHost = *maxIdleConnsPerBackend
if tr.MaxIdleConns != 0 && tr.MaxIdleConns < tr.MaxIdleConnsPerHost {
tr.MaxIdleConns = tr.MaxIdleConnsPerHost

View file

@ -5,37 +5,88 @@ package barpool
import (
"fmt"
"io"
"os"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/terminal"
"github.com/cheggaaa/pb/v3"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/terminal"
)
var isDisabled bool
// Disable sets progress bar to be no-op if v==true
func Disable(v bool) {
isDisabled = v
}
var pool = pb.NewPool()
// Add adds bar to the global pool
func Add(bar *pb.ProgressBar) { pool.Add(bar) }
// Bar is an interface for progress bar
type Bar interface {
Add(value int)
Increment()
Start()
Finish()
NewProxyReader(r io.Reader) *pb.Reader
}
type progressBar struct {
*pb.ProgressBar
}
func (pb *progressBar) Finish() { pb.ProgressBar.Finish() }
func (pb *progressBar) Start() { pb.ProgressBar.Start() }
func (pb *progressBar) Add(value int) { pb.ProgressBar.Add(value) }
func (pb *progressBar) Increment() { pb.ProgressBar.Increment() }
func (pb *progressBar) NewProxyReader(r io.Reader) *pb.Reader {
return pb.ProgressBar.NewProxyReader(r)
}
type progressBarNoOp struct{}
func (pbno *progressBarNoOp) Finish() {}
func (pbno *progressBarNoOp) Start() {}
func (pbno *progressBarNoOp) Add(int) {}
func (pbno *progressBarNoOp) Increment() {}
func (pbno *progressBarNoOp) NewProxyReader(_ io.Reader) *pb.Reader { return nil }
// Start starts the global pool
// Must be called after all progress bars were added
func Start() error { return pool.Start() }
func Start() error {
if isDisabled {
return nil
}
return pool.Start()
}
// Stop stops the global pool
func Stop() { _ = pool.Stop() }
func Stop() {
if isDisabled {
return
}
_ = pool.Stop()
}
// AddWithTemplate adds bar with the given template
// to the global pool
func AddWithTemplate(format string, total int) *pb.ProgressBar {
func AddWithTemplate(format string, total int) Bar {
if isDisabled {
return &progressBarNoOp{}
}
tpl := getTemplate(format)
bar := pb.ProgressBarTemplate(tpl).New(total)
Add(bar)
return bar
pool.Add(bar)
return &progressBar{bar}
}
// NewSingleProgress returns progress bar with given template
func NewSingleProgress(format string, total int) *pb.ProgressBar {
func NewSingleProgress(format string, total int) Bar {
if isDisabled {
return &progressBarNoOp{}
}
tpl := getTemplate(format)
return pb.ProgressBarTemplate(tpl).New(total)
return &progressBar{pb.ProgressBarTemplate(tpl).New(total)}
}
func getTemplate(format string) string {

View file

@ -10,8 +10,9 @@ import (
)
const (
globalSilent = "s"
globalVerbose = "verbose"
globalSilent = "s"
globalVerbose = "verbose"
globalDisableProgressBar = "disable-progress-bar"
)
var (
@ -26,6 +27,11 @@ var (
Value: false,
Usage: "Whether to enable verbosity in logs output.",
},
&cli.BoolFlag{
Name: globalDisableProgressBar,
Value: false,
Usage: "Whether to disable progress bar during the import.",
},
}
)
@ -39,7 +45,6 @@ const (
vmBatchSize = "vm-batch-size"
vmSignificantFigures = "vm-significant-figures"
vmRoundDigits = "vm-round-digits"
vmDisableProgressBar = "vm-disable-progress-bar"
vmCertFile = "vm-cert-file"
vmKeyFile = "vm-key-file"
vmCAFile = "vm-CA-file"
@ -120,10 +125,6 @@ var (
Usage: "Optional data transfer rate limit in bytes per second.\n" +
"By default, the rate limit is disabled. It can be useful for limiting load on configured via '--vmAddr' destination.",
},
&cli.BoolFlag{
Name: vmDisableProgressBar,
Usage: "Whether to disable progress bar per each worker during the import.",
},
&cli.StringFlag{
Name: vmCertFile,
Usage: "Optional path to client-side TLS certificate file to use when connecting to '--vmAddr'",

View file

@ -18,14 +18,14 @@ type influxProcessor struct {
separator string
skipDbLabel bool
promMode bool
isSilent bool
isVerbose bool
}
func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator string, skipDbLabel, promMode, silent, verbose bool) *influxProcessor {
func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator string, skipDbLabel, promMode, verbose bool) *influxProcessor {
if cc < 1 {
cc = 1
}
return &influxProcessor{
ic: ic,
im: im,
@ -33,7 +33,6 @@ func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator st
separator: separator,
skipDbLabel: skipDbLabel,
promMode: promMode,
isSilent: silent,
isVerbose: verbose,
}
}
@ -48,7 +47,7 @@ func (ip *influxProcessor) run() error {
}
question := fmt.Sprintf("Found %d timeseries to import. Continue?", len(series))
if !ip.isSilent && !prompt(question) {
if !prompt(question) {
return nil
}

View file

@ -16,6 +16,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread"
@ -37,15 +38,23 @@ func main() {
ctx, cancelCtx := context.WithCancel(context.Background())
start := time.Now()
beforeFn := func(c *cli.Context) error {
isSilent = c.Bool(globalSilent)
if c.Bool(globalDisableProgressBar) {
barpool.Disable(true)
}
return nil
}
app := &cli.App{
Name: "vmctl",
Usage: "VictoriaMetrics command-line tool",
Version: buildinfo.Version,
Commands: []*cli.Command{
{
Name: "opentsdb",
Usage: "Migrate time series from OpenTSDB",
Flags: mergeFlags(globalFlags, otsdbFlags, vmFlags),
Name: "opentsdb",
Usage: "Migrate time series from OpenTSDB",
Flags: mergeFlags(globalFlags, otsdbFlags, vmFlags),
Before: beforeFn,
Action: func(c *cli.Context) error {
fmt.Println("OpenTSDB import mode")
@ -81,22 +90,20 @@ func main() {
if err != nil {
return fmt.Errorf("failed to init VM configuration: %s", err)
}
// disable progress bars since openTSDB implementation
// does not use progress bar pool
vmCfg.DisableProgressBar = true
importer, err := vm.NewImporter(ctx, vmCfg)
if err != nil {
return fmt.Errorf("failed to create VM importer: %s", err)
}
otsdbProcessor := newOtsdbProcessor(otsdbClient, importer, c.Int(otsdbConcurrency), c.Bool(globalSilent), c.Bool(globalVerbose))
otsdbProcessor := newOtsdbProcessor(otsdbClient, importer, c.Int(otsdbConcurrency), c.Bool(globalVerbose))
return otsdbProcessor.run()
},
},
{
Name: "influx",
Usage: "Migrate time series from InfluxDB",
Flags: mergeFlags(globalFlags, influxFlags, vmFlags),
Name: "influx",
Usage: "Migrate time series from InfluxDB",
Flags: mergeFlags(globalFlags, influxFlags, vmFlags),
Before: beforeFn,
Action: func(c *cli.Context) error {
fmt.Println("InfluxDB import mode")
@ -148,15 +155,15 @@ func main() {
c.String(influxMeasurementFieldSeparator),
c.Bool(influxSkipDatabaseLabel),
c.Bool(influxPrometheusMode),
c.Bool(globalSilent),
c.Bool(globalVerbose))
return processor.run()
},
},
{
Name: "remote-read",
Usage: "Migrate time series via Prometheus remote-read protocol",
Flags: mergeFlags(globalFlags, remoteReadFlags, vmFlags),
Name: "remote-read",
Usage: "Migrate time series via Prometheus remote-read protocol",
Flags: mergeFlags(globalFlags, remoteReadFlags, vmFlags),
Before: beforeFn,
Action: func(c *cli.Context) error {
fmt.Println("Remote-read import mode")
@ -209,16 +216,16 @@ func main() {
timeReverse: c.Bool(remoteReadFilterTimeReverse),
},
cc: c.Int(remoteReadConcurrency),
isSilent: c.Bool(globalSilent),
isVerbose: c.Bool(globalVerbose),
}
return rmp.run(ctx)
},
},
{
Name: "prometheus",
Usage: "Migrate time series from Prometheus",
Flags: mergeFlags(globalFlags, promFlags, vmFlags),
Name: "prometheus",
Usage: "Migrate time series from Prometheus",
Flags: mergeFlags(globalFlags, promFlags, vmFlags),
Before: beforeFn,
Action: func(c *cli.Context) error {
fmt.Println("Prometheus import mode")
@ -245,17 +252,19 @@ func main() {
return fmt.Errorf("failed to create prometheus client: %s", err)
}
pp := prometheusProcessor{
cl: cl,
im: importer,
cc: c.Int(promConcurrency),
cl: cl,
im: importer,
cc: c.Int(promConcurrency),
isVerbose: c.Bool(globalVerbose),
}
return pp.run(c.Bool(globalSilent), c.Bool(globalVerbose))
return pp.run()
},
},
{
Name: "vm-native",
Usage: "Migrate time series between VictoriaMetrics installations via native binary format",
Flags: mergeFlags(globalFlags, vmNativeFlags),
Name: "vm-native",
Usage: "Migrate time series between VictoriaMetrics installations via native binary format",
Flags: mergeFlags(globalFlags, vmNativeFlags),
Before: beforeFn,
Action: func(c *cli.Context) error {
fmt.Println("VictoriaMetrics Native import mode")
@ -344,7 +353,6 @@ func main() {
backoff: backoff.New(),
cc: c.Int(vmConcurrency),
disablePerMetricRequests: c.Bool(vmNativeDisablePerMetricMigration),
isSilent: c.Bool(globalSilent),
isNative: !c.Bool(vmNativeDisableBinaryProtocol),
}
return p.run(ctx)
@ -360,6 +368,7 @@ func main() {
Value: false,
},
},
Before: beforeFn,
Action: func(c *cli.Context) error {
common.StartUnmarshalWorkers()
blockPath := c.Args().First()
@ -433,6 +442,5 @@ func initConfigVM(c *cli.Context) (vm.Config, error) {
RoundDigits: c.Int(vmRoundDigits),
ExtraLabels: c.StringSlice(vmExtraLabel),
RateLimit: c.Int64(vmRateLimit),
DisableProgressBar: c.Bool(vmDisableProgressBar),
}, nil
}

View file

@ -15,7 +15,6 @@ type otsdbProcessor struct {
oc *opentsdb.Client
im *vm.Importer
otsdbcc int
isSilent bool
isVerbose bool
}
@ -26,7 +25,7 @@ type queryObj struct {
StartTime int64
}
func newOtsdbProcessor(oc *opentsdb.Client, im *vm.Importer, otsdbcc int, silent, verbose bool) *otsdbProcessor {
func newOtsdbProcessor(oc *opentsdb.Client, im *vm.Importer, otsdbcc int, verbose bool) *otsdbProcessor {
if otsdbcc < 1 {
otsdbcc = 1
}
@ -34,7 +33,6 @@ func newOtsdbProcessor(oc *opentsdb.Client, im *vm.Importer, otsdbcc int, silent
oc: oc,
im: im,
otsdbcc: otsdbcc,
isSilent: silent,
isVerbose: verbose,
}
}
@ -55,7 +53,7 @@ func (op *otsdbProcessor) run() error {
}
question := fmt.Sprintf("Found %d metrics to import. Continue?", len(metrics))
if !op.isSilent && !prompt(question) {
if !prompt(question) {
return nil
}
op.im.ResetStats()

View file

@ -5,11 +5,12 @@ import (
"log"
"sync"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
)
type prometheusProcessor struct {
@ -24,9 +25,12 @@ type prometheusProcessor struct {
// and defines number of concurrently
// running snapshot block readers
cc int
// isVerbose enables verbose output
isVerbose bool
}
func (pp *prometheusProcessor) run(silent, verbose bool) error {
func (pp *prometheusProcessor) run() error {
blocks, err := pp.cl.Explore()
if err != nil {
return fmt.Errorf("explore failed: %s", err)
@ -35,12 +39,11 @@ func (pp *prometheusProcessor) run(silent, verbose bool) error {
return fmt.Errorf("found no blocks to import")
}
question := fmt.Sprintf("Found %d blocks to import. Continue?", len(blocks))
if !silent && !prompt(question) {
if !prompt(question) {
return nil
}
bar := barpool.AddWithTemplate(fmt.Sprintf(barTpl, "Processing blocks"), len(blocks))
if err := barpool.Start(); err != nil {
return err
}
@ -72,7 +75,7 @@ func (pp *prometheusProcessor) run(silent, verbose bool) error {
return fmt.Errorf("prometheus error: %s", promErr)
case vmErr := <-pp.im.Errors():
close(blockReadersCh)
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, pp.isVerbose))
case blockReadersCh <- br:
}
}
@ -85,7 +88,7 @@ func (pp *prometheusProcessor) run(silent, verbose bool) error {
// drain import errors channel
for vmErr := range pp.im.Errors() {
if vmErr.Err != nil {
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, pp.isVerbose))
}
}
for err := range errCh {

View file

@ -25,6 +25,9 @@ const (
// This test simulates close process if user abort it
func Test_prometheusProcessor_run(t *testing.T) {
t.Skip()
defer func() { isSilent = false }()
type fields struct {
cfg prometheus.Config
vmCfg vm.Config
@ -117,11 +120,12 @@ func Test_prometheusProcessor_run(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
client := tt.fields.cl(tt.fields.cfg)
importer := tt.fields.im(tt.fields.vmCfg)
isSilent = tt.args.silent
pp := &prometheusProcessor{
cl: client,
im: importer,
cc: tt.fields.cc,
cl: client,
im: importer,
cc: tt.fields.cc,
isVerbose: tt.args.verbose,
}
// we should answer on prompt
@ -157,7 +161,7 @@ func Test_prometheusProcessor_run(t *testing.T) {
go tt.fields.closer(importer)
}
if err := pp.run(tt.args.silent, tt.args.verbose); (err != nil) != tt.wantErr {
if err := pp.run(); (err != nil) != tt.wantErr {
t.Errorf("run() error = %v, wantErr %v", err, tt.wantErr)
}
})

View file

@ -6,14 +6,21 @@ import (
"testing"
"time"
"github.com/prometheus/prometheus/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/testdata/servers_integration_test"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/prometheus/prometheus/prompb"
)
func TestRemoteRead(t *testing.T) {
barpool.Disable(true)
defer func() {
barpool.Disable(false)
}()
defer func() { isSilent = false }()
var testCases = []struct {
name string
@ -31,7 +38,7 @@ func TestRemoteRead(t *testing.T) {
{
name: "step minute on minute time range",
remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"},
vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true},
vmCfg: vm.Config{Addr: "", Concurrency: 1},
start: "2022-11-26T11:23:05+02:00",
end: "2022-11-26T11:24:05+02:00",
numOfSamples: 2,
@ -62,7 +69,7 @@ func TestRemoteRead(t *testing.T) {
{
name: "step month on month time range",
remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"},
vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true,
vmCfg: vm.Config{Addr: "", Concurrency: 1,
Transport: http.DefaultTransport.(*http.Transport)},
start: "2022-09-26T11:23:05+02:00",
end: "2022-11-26T11:24:05+02:00",
@ -157,7 +164,6 @@ func TestRemoteRead(t *testing.T) {
chunk: tt.chunk,
},
cc: 1,
isSilent: true,
isVerbose: false,
}
@ -170,6 +176,11 @@ func TestRemoteRead(t *testing.T) {
}
func TestSteamRemoteRead(t *testing.T) {
barpool.Disable(true)
defer func() {
barpool.Disable(false)
}()
defer func() { isSilent = false }()
var testCases = []struct {
name string
@ -187,7 +198,7 @@ func TestSteamRemoteRead(t *testing.T) {
{
name: "step minute on minute time range",
remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true},
vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true},
vmCfg: vm.Config{Addr: "", Concurrency: 1},
start: "2022-11-26T11:23:05+02:00",
end: "2022-11-26T11:24:05+02:00",
numOfSamples: 2,
@ -218,7 +229,7 @@ func TestSteamRemoteRead(t *testing.T) {
{
name: "step month on month time range",
remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true},
vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true},
vmCfg: vm.Config{Addr: "", Concurrency: 1},
start: "2022-09-26T11:23:05+02:00",
end: "2022-11-26T11:24:05+02:00",
numOfSamples: 2,
@ -312,7 +323,6 @@ func TestSteamRemoteRead(t *testing.T) {
chunk: tt.chunk,
},
cc: 1,
isSilent: true,
isVerbose: false,
}

View file

@ -7,8 +7,6 @@ import (
"sync"
"time"
"github.com/cheggaaa/pb/v3"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper"
@ -22,7 +20,6 @@ type remoteReadProcessor struct {
src *remoteread.Client
cc int
isSilent bool
isVerbose bool
}
@ -50,21 +47,17 @@ func (rrp *remoteReadProcessor) run(ctx context.Context) error {
question := fmt.Sprintf("Selected time range %q - %q will be split into %d ranges according to %q step. Continue?",
rrp.filter.timeStart.String(), rrp.filter.timeEnd.String(), len(ranges), rrp.filter.chunk)
if !rrp.isSilent && !prompt(question) {
if !prompt(question) {
return nil
}
var bar *pb.ProgressBar
if !rrp.isSilent {
bar = barpool.AddWithTemplate(fmt.Sprintf(barTpl, "Processing ranges"), len(ranges))
if err := barpool.Start(); err != nil {
return err
}
bar := barpool.AddWithTemplate(fmt.Sprintf(barTpl, "Processing ranges"), len(ranges))
if err := barpool.Start(); err != nil {
return err
}
defer func() {
if !rrp.isSilent {
barpool.Stop()
}
barpool.Stop()
log.Println("Import finished!")
log.Print(rrp.dst.Stats())
}()
@ -82,9 +75,7 @@ func (rrp *remoteReadProcessor) run(ctx context.Context) error {
errCh <- fmt.Errorf("request failed for: %s", err)
return
}
if bar != nil {
bar.Increment()
}
bar.Increment()
}
}()
}

View file

@ -12,7 +12,13 @@ import (
const barTpl = `{{ blue "%s:" }} {{ counters . }} {{ bar . "[" "█" (cycle . "█") "▒" "]" }} {{ percent . }}`
// isSilent should be inited in main
var isSilent bool
func prompt(question string) bool {
if isSilent {
return true
}
isTerminal := terminal.IsTerminal(int(os.Stdout.Fd()))
if !isTerminal {
return true

View file

@ -16,7 +16,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/limiter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/cheggaaa/pb/v3"
)
// Config contains list of params to configure
@ -55,8 +54,6 @@ type Config struct {
// RateLimit defines a data transfer speed in bytes per second.
// Is applied to each worker (see Concurrency) independently.
RateLimit int64
// Whether to disable progress bar per VM worker
DisableProgressBar bool
}
// Importer performs insertion of timeseries
@ -159,12 +156,10 @@ func NewImporter(ctx context.Context, cfg Config) (*Importer, error) {
im.wg.Add(int(cfg.Concurrency))
for i := 0; i < int(cfg.Concurrency); i++ {
var bar *pb.ProgressBar
if !cfg.DisableProgressBar {
pbPrefix := fmt.Sprintf(`{{ green "VM worker %d:" }}`, i)
bar = barpool.AddWithTemplate(pbPrefix+pbTpl, 0)
}
go func(bar *pb.ProgressBar) {
pbPrefix := fmt.Sprintf(`{{ green "VM worker %d:" }}`, i)
bar := barpool.AddWithTemplate(pbPrefix+pbTpl, 0)
go func(bar barpool.Bar) {
defer im.wg.Done()
im.startWorker(ctx, bar, cfg.BatchSize, cfg.SignificantFigures, cfg.RoundDigits)
}(bar)
@ -217,7 +212,7 @@ func (im *Importer) Close() {
})
}
func (im *Importer) startWorker(ctx context.Context, bar *pb.ProgressBar, batchSize, significantFigures, roundDigits int) {
func (im *Importer) startWorker(ctx context.Context, bar barpool.Bar, batchSize, significantFigures, roundDigits int) {
var batch []*TimeSeries
var dataPoints int
var waitForBatch time.Time
@ -252,9 +247,7 @@ func (im *Importer) startWorker(ctx context.Context, bar *pb.ProgressBar, batchS
batch = append(batch, ts)
dataPoints += len(ts.Values)
if bar != nil {
bar.Add(len(ts.Values))
}
bar.Add(len(ts.Values))
if dataPoints < batchSize {
continue

View file

@ -9,8 +9,6 @@ import (
"sync"
"time"
"github.com/cheggaaa/pb/v3"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/limiter"
@ -33,7 +31,6 @@ type vmNativeProcessor struct {
rateLimit int64
interCluster bool
cc int
isSilent bool
isNative bool
disablePerMetricRequests bool
@ -82,13 +79,13 @@ func (p *vmNativeProcessor) run(ctx context.Context) error {
return fmt.Errorf("failed to get tenants: %w", err)
}
question := fmt.Sprintf("The following tenants were discovered: %s.\n Continue?", tenants)
if !p.isSilent && !prompt(question) {
if !prompt(question) {
return nil
}
}
for _, tenantID := range tenants {
err := p.runBackfilling(ctx, tenantID, ranges, p.isSilent)
err := p.runBackfilling(ctx, tenantID, ranges)
if err != nil {
return fmt.Errorf("migration failed: %s", err)
}
@ -100,7 +97,7 @@ func (p *vmNativeProcessor) run(ctx context.Context) error {
return nil
}
func (p *vmNativeProcessor) do(ctx context.Context, f native.Filter, srcURL, dstURL string, bar *pb.ProgressBar) error {
func (p *vmNativeProcessor) do(ctx context.Context, f native.Filter, srcURL, dstURL string, bar barpool.Bar) error {
retryableFunc := func() error { return p.runSingle(ctx, f, srcURL, dstURL, bar) }
attempts, err := p.backoff.Retry(ctx, retryableFunc)
@ -114,15 +111,18 @@ func (p *vmNativeProcessor) do(ctx context.Context, f native.Filter, srcURL, dst
return nil
}
func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcURL, dstURL string, bar *pb.ProgressBar) error {
func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcURL, dstURL string, bar barpool.Bar) error {
reader, err := p.src.ExportPipe(ctx, srcURL, f)
if err != nil {
return fmt.Errorf("failed to init export pipe: %w", err)
}
if p.disablePerMetricRequests && bar != nil {
fmt.Printf("Continue import process with filter %s:\n", f.String())
reader = bar.NewProxyReader(reader)
if p.disablePerMetricRequests {
pr := bar.NewProxyReader(reader)
if pr != nil {
reader = bar.NewProxyReader(reader)
fmt.Printf("Continue import process with filter %s:\n", f.String())
}
}
pr, pw := io.Pipe()
@ -155,7 +155,7 @@ func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcU
return <-importCh
}
func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, ranges [][]time.Time, silent bool) error {
func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, ranges [][]time.Time) error {
exportAddr := nativeExportAddr
importAddr := nativeImportAddr
if p.isNative {
@ -194,7 +194,7 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
"": ranges,
}
if !p.disablePerMetricRequests {
metrics, err = p.explore(ctx, p.src, tenantID, ranges, silent)
metrics, err = p.explore(ctx, p.src, tenantID, ranges)
if err != nil {
return fmt.Errorf("failed to explore metric names: %s", err)
}
@ -216,26 +216,24 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
// do not prompt for intercluster because there could be many tenants,
// and we don't want to interrupt the process when moving to the next tenant.
question := foundSeriesMsg + ". Continue?"
if !silent && !prompt(question) {
if !prompt(question) {
return nil
}
} else {
log.Print(foundSeriesMsg)
}
var bar *pb.ProgressBar
barPrefix := "Requests to make"
if p.interCluster {
barPrefix = fmt.Sprintf("Requests to make for tenant %s", tenantID)
}
if !silent {
bar = barpool.NewSingleProgress(fmt.Sprintf(nativeWithBackoffTpl, barPrefix), requestsToMake)
if p.disablePerMetricRequests {
bar = barpool.NewSingleProgress(nativeSingleProcessTpl, 0)
}
bar.Start()
defer bar.Finish()
bar := barpool.NewSingleProgress(fmt.Sprintf(nativeWithBackoffTpl, barPrefix), requestsToMake)
if p.disablePerMetricRequests {
bar = barpool.NewSingleProgress(nativeSingleProcessTpl, 0)
}
bar.Start()
defer bar.Finish()
filterCh := make(chan native.Filter)
errCh := make(chan error, p.cc)
@ -251,9 +249,7 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
errCh <- err
return
}
if bar != nil {
bar.Increment()
}
bar.Increment()
} else {
if err := p.runSingle(ctx, f, srcURL, dstURL, bar); err != nil {
errCh <- err
@ -298,15 +294,12 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
return nil
}
func (p *vmNativeProcessor) explore(ctx context.Context, src *native.Client, tenantID string, ranges [][]time.Time, silent bool) (map[string][][]time.Time, error) {
func (p *vmNativeProcessor) explore(ctx context.Context, src *native.Client, tenantID string, ranges [][]time.Time) (map[string][][]time.Time, error) {
log.Printf("Exploring metrics...")
var bar *pb.ProgressBar
if !silent {
bar = barpool.NewSingleProgress(fmt.Sprintf(nativeWithBackoffTpl, "Explore requests to make"), len(ranges))
bar.Start()
defer bar.Finish()
}
bar := barpool.NewSingleProgress(fmt.Sprintf(nativeWithBackoffTpl, "Explore requests to make"), len(ranges))
bar.Start()
defer bar.Finish()
metrics := make(map[string][][]time.Time)
for _, r := range ranges {
@ -317,9 +310,7 @@ func (p *vmNativeProcessor) explore(ctx context.Context, src *native.Client, ten
for i := range ms {
metrics[ms[i]] = append(metrics[ms[i]], r)
}
if bar != nil {
bar.Increment()
}
bar.Increment()
}
return metrics, nil
}

View file

@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
"log"
"net/http"
"os"
@ -37,6 +38,12 @@ func Test_vmNativeProcessor_run(t *testing.T) {
}
}()
barpool.Disable(true)
defer func() {
barpool.Disable(false)
}()
defer func() { isSilent = false }()
type fields struct {
filter native.Filter
dst *native.Client
@ -218,6 +225,7 @@ func Test_vmNativeProcessor_run(t *testing.T) {
HTTPClient: &http.Client{Transport: &http.Transport{DisableKeepAlives: false}},
}
isSilent = tt.args.silent
p := &vmNativeProcessor{
filter: tt.fields.filter,
dst: tt.fields.dst,
@ -227,7 +235,6 @@ func Test_vmNativeProcessor_run(t *testing.T) {
rateLimit: tt.fields.rateLimit,
interCluster: tt.fields.interCluster,
cc: tt.fields.cc,
isSilent: tt.args.silent,
isNative: true,
}

View file

@ -73,8 +73,8 @@ var (
"See also -opentsdbHTTPListenAddr.useProxyProtocol")
opentsdbHTTPUseProxyProtocol = flag.Bool("opentsdbHTTPListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted "+
"at -opentsdbHTTPListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
configAuthKey = flagutil.NewPassword("configAuthKey", "Authorization key for accessing /config page. It must be passed via authKey query arg")
reloadAuthKey = flagutil.NewPassword("reloadAuthKey", "Auth key for /-/reload http endpoint. It must be passed as authKey=...")
configAuthKey = flagutil.NewPassword("configAuthKey", "Authorization key for accessing /config page. It must be passed via authKey query arg. It overrides httpAuth.* settings.")
reloadAuthKey = flagutil.NewPassword("reloadAuthKey", "Auth key for /-/reload http endpoint. It must be passed via authKey query arg. It overrides httpAuth.* settings.")
maxLabelsPerTimeseries = flag.Int("maxLabelsPerTimeseries", 30, "The maximum number of labels accepted per time series. Superfluous labels are dropped. In this case the vm_metrics_with_dropped_labels_total metric at /metrics page is incremented")
maxLabelValueLen = flag.Int("maxLabelValueLen", 1024, "The maximum length of label values in the accepted time series. Longer label values are truncated. In this case the vm_too_long_label_values_total metric at /metrics page is incremented")
)

View file

@ -11,7 +11,6 @@
"@types/lodash.debounce": "^4.0.6",
"@types/lodash.get": "^4.4.6",
"@types/lodash.throttle": "^4.1.6",
"@types/marked": "^5.0.0",
"@types/node": "^20.4.0",
"@types/qs": "^6.9.7",
"@types/react-input-mask": "^3.0.2",
@ -22,7 +21,8 @@
"lodash.debounce": "^4.0.8",
"lodash.get": "^4.4.2",
"lodash.throttle": "^4.1.1",
"marked": "^5.1.0",
"marked": "^12.0.2",
"marked-emoji": "^1.4.0",
"preact": "^10.7.1",
"qs": "^6.10.3",
"react-input-mask": "^2.0.4",
@ -4272,11 +4272,6 @@
"@types/lodash": "*"
}
},
"node_modules/@types/marked": {
"version": "5.0.2",
"resolved": "https://registry.npmjs.org/@types/marked/-/marked-5.0.2.tgz",
"integrity": "sha512-OucS4KMHhFzhz27KxmWg7J+kIYqyqoW5kdIEI319hqARQQUTqhao3M/F+uFnDXD0Rg72iDDZxZNxq5gvctmLlg=="
},
"node_modules/@types/mime": {
"version": "1.3.5",
"resolved": "https://registry.npmjs.org/@types/mime/-/mime-1.3.5.tgz",
@ -13598,14 +13593,22 @@
}
},
"node_modules/marked": {
"version": "5.1.2",
"resolved": "https://registry.npmjs.org/marked/-/marked-5.1.2.tgz",
"integrity": "sha512-ahRPGXJpjMjwSOlBoTMZAK7ATXkli5qCPxZ21TG44rx1KEo44bii4ekgTDQPNRQ4Kh7JMb9Ub1PVk1NxRSsorg==",
"version": "12.0.2",
"resolved": "https://registry.npmjs.org/marked/-/marked-12.0.2.tgz",
"integrity": "sha512-qXUm7e/YKFoqFPYPa3Ukg9xlI5cyAtGmyEIzMfW//m6kXwCy2Ps9DYf5ioijFKQ8qyuscrHoY04iJGctu2Kg0Q==",
"bin": {
"marked": "bin/marked.js"
},
"engines": {
"node": ">= 16"
"node": ">= 18"
}
},
"node_modules/marked-emoji": {
"version": "1.4.0",
"resolved": "https://registry.npmjs.org/marked-emoji/-/marked-emoji-1.4.0.tgz",
"integrity": "sha512-/2TJfGzXpiBBq+X3akHHbTrAjZPJDwR+7FV6SyQLECnQEfaoVkrpKZJzHhPTAq3Sl/A1l2frMT0u6b38VBBlNg==",
"peerDependencies": {
"marked": ">=4 <13"
}
},
"node_modules/mdn-data": {

View file

@ -7,7 +7,6 @@
"@types/lodash.debounce": "^4.0.6",
"@types/lodash.get": "^4.4.6",
"@types/lodash.throttle": "^4.1.6",
"@types/marked": "^5.0.0",
"@types/node": "^20.4.0",
"@types/qs": "^6.9.7",
"@types/react-input-mask": "^3.0.2",
@ -18,7 +17,8 @@
"lodash.debounce": "^4.0.8",
"lodash.get": "^4.4.2",
"lodash.throttle": "^4.1.1",
"marked": "^5.1.0",
"marked": "^12.0.2",
"marked-emoji": "^1.4.0",
"preact": "^10.7.1",
"qs": "^6.10.3",
"react-input-mask": "^2.0.4",

View file

@ -4,6 +4,7 @@ import AppContextProvider from "./contexts/AppContextProvider";
import ThemeProvider from "./components/Main/ThemeProvider/ThemeProvider";
import ExploreLogs from "./pages/ExploreLogs/ExploreLogs";
import LogsLayout from "./layouts/LogsLayout/LogsLayout";
import "./constants/markedPlugins";
const AppLogs: FC = () => {
const [loadedTheme, setLoadedTheme] = useState(false);

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,5 @@
import { markedEmoji } from "marked-emoji";
import { marked } from "marked";
import emojis from "./emojis";
marked.use(markedEmoji({ emojis, renderer: (token) => token.emoji }));

View file

@ -54,7 +54,7 @@ const useGetMetricsQL = () => {
const processMarkdown = (text: string) => {
const div = document.createElement("div");
div.innerHTML = marked(text);
div.innerHTML = marked(text) as string;
const groups = div.querySelectorAll(`${CATEGORY_TAG}, ${FUNCTION_TAG}`);
return processGroups(groups);
};

View file

@ -26,6 +26,7 @@ const ExploreLogs: FC = () => {
const { logs, isLoading, error, fetchLogs } = useFetchLogs(serverUrl, query, limit);
const [queryError, setQueryError] = useState<ErrorTypes | string>("");
const [loaded, isLoaded] = useState(false);
const [markdownParsing, setMarkdownParsing] = useState(getFromStorage("LOGS_MARKDOWN") === "true");
const handleRunQuery = () => {
if (!query) {
@ -51,6 +52,11 @@ const ExploreLogs: FC = () => {
saveToStorage("LOGS_LIMIT", `${limit}`);
};
const handleChangeMarkdownParsing = (val: boolean) => {
saveToStorage("LOGS_MARKDOWN", `${val}`);
setMarkdownParsing(val);
};
useEffect(() => {
if (query) handleRunQuery();
}, [period]);
@ -65,15 +71,18 @@ const ExploreLogs: FC = () => {
query={query}
error={queryError}
limit={limit}
markdownParsing={markdownParsing}
onChange={setQuery}
onChangeLimit={handleChangeLimit}
onRun={handleRunQuery}
onChangeMarkdownParsing={handleChangeMarkdownParsing}
/>
{isLoading && <Spinner />}
{error && <Alert variant="error">{error}</Alert>}
<ExploreLogsBody
data={logs}
loaded={loaded}
markdownParsing={markdownParsing}
/>
</div>
);

View file

@ -15,10 +15,12 @@ import useBoolean from "../../../hooks/useBoolean";
import TableLogs from "./TableLogs";
import GroupLogs from "../GroupLogs/GroupLogs";
import { DATE_TIME_FORMAT } from "../../../constants/date";
import { marked } from "marked";
export interface ExploreLogBodyProps {
data: Logs[];
loaded?: boolean;
markdownParsing: boolean;
}
enum DisplayType {
@ -33,7 +35,7 @@ const tabs = [
{ label: "JSON", value: DisplayType.json, icon: <CodeIcon/> },
];
const ExploreLogsBody: FC<ExploreLogBodyProps> = ({ data, loaded }) => {
const ExploreLogsBody: FC<ExploreLogBodyProps> = ({ data, loaded, markdownParsing }) => {
const { isMobile } = useDeviceDetect();
const { timezone } = useTimeState();
const { setSearchParamsFromKeys } = useSearchParamsFromObject();
@ -46,11 +48,12 @@ const ExploreLogsBody: FC<ExploreLogBodyProps> = ({ data, loaded }) => {
...item,
_vmui_time: item._time ? dayjs(item._time).tz().format(`${DATE_TIME_FORMAT}.SSS`) : "",
_vmui_data: JSON.stringify(item, null, 2),
_vmui_markdown: marked(item._msg.replace(/```/g, "\n```\n")) as string,
})) as Logs[], [data, timezone]);
const columns = useMemo(() => {
if (!logs?.length) return [];
const hideColumns = ["_vmui_data", "_vmui_time"];
const hideColumns = ["_vmui_data", "_vmui_time", "_vmui_markdown"];
const keys = new Set<string>();
for (const item of logs) {
for (const key in item) {
@ -125,6 +128,7 @@ const ExploreLogsBody: FC<ExploreLogBodyProps> = ({ data, loaded }) => {
<GroupLogs
logs={logs}
columns={columns}
markdownParsing={markdownParsing}
/>
)}
{activeTab === DisplayType.json && (

View file

@ -6,17 +6,29 @@ import useDeviceDetect from "../../../hooks/useDeviceDetect";
import Button from "../../../components/Main/Button/Button";
import QueryEditor from "../../../components/Configurators/QueryEditor/QueryEditor";
import TextField from "../../../components/Main/TextField/TextField";
import Switch from "../../../components/Main/Switch/Switch";
export interface ExploreLogHeaderProps {
query: string;
limit: number;
error?: string;
markdownParsing: boolean;
onChange: (val: string) => void;
onChangeLimit: (val: number) => void;
onRun: () => void;
onChangeMarkdownParsing: (val: boolean) => void;
}
const ExploreLogsHeader: FC<ExploreLogHeaderProps> = ({ query, limit, error, onChange, onChangeLimit, onRun }) => {
const ExploreLogsHeader: FC<ExploreLogHeaderProps> = ({
query,
limit,
error,
markdownParsing,
onChange,
onChangeLimit,
onRun,
onChangeMarkdownParsing,
}) => {
const { isMobile } = useDeviceDetect();
const [errorLimit, setErrorLimit] = useState("");
@ -66,6 +78,14 @@ const ExploreLogsHeader: FC<ExploreLogHeaderProps> = ({ query, limit, error, onC
/>
</div>
<div className="vm-explore-logs-header-bottom">
<div className="vm-explore-logs-header-bottom-contols">
<Switch
label={"Markdown parsing"}
value={markdownParsing}
onChange={onChangeMarkdownParsing}
fullWidth={isMobile}
/>
</div>
<div className="vm-explore-logs-header-bottom-helpful">
<a
className="vm-link vm-link_with-icon"

View file

@ -25,6 +25,10 @@
justify-content: normal;
}
&-contols {
flex-grow: 1;
}
&__execute {
display: grid;
}

View file

@ -11,9 +11,10 @@ import GroupLogsItem from "./GroupLogsItem";
interface TableLogsProps {
logs: Logs[];
columns: string[];
markdownParsing: boolean;
}
const GroupLogs: FC<TableLogsProps> = ({ logs }) => {
const GroupLogs: FC<TableLogsProps> = ({ logs, markdownParsing }) => {
const copyToClipboard = useCopyToClipboard();
const [copied, setCopied] = useState<string | null>(null);
@ -77,6 +78,7 @@ const GroupLogs: FC<TableLogsProps> = ({ logs }) => {
<GroupLogsItem
key={`${value._msg}${value._time}`}
log={value}
markdownParsing={markdownParsing}
/>
))}
</div>

View file

@ -10,15 +10,16 @@ import classNames from "classnames";
interface Props {
log: Logs;
markdownParsing: boolean;
}
const GroupLogsItem: FC<Props> = ({ log }) => {
const GroupLogsItem: FC<Props> = ({ log, markdownParsing }) => {
const {
value: isOpenFields,
toggle: toggleOpenFields,
} = useBoolean(false);
const excludeKeys = ["_stream", "_msg", "_time", "_vmui_time", "_vmui_data"];
const excludeKeys = ["_stream", "_msg", "_time", "_vmui_time", "_vmui_data", "_vmui_markdown"];
const fields = Object.entries(log).filter(([key]) => !excludeKeys.includes(key));
const hasFields = fields.length > 0;
@ -71,6 +72,7 @@ const GroupLogsItem: FC<Props> = ({ log }) => {
"vm-group-logs-row-content__msg": true,
"vm-group-logs-row-content__msg_missing": !log._msg
})}
dangerouslySetInnerHTML={markdownParsing && log._vmui_markdown ? { __html: log._vmui_markdown } : undefined}
>
{log._msg || "message missing"}
</div>

View file

@ -96,6 +96,65 @@
font-style: italic;
text-align: center;
}
/* styles for markdown */
p, pre, code {
white-space: pre-wrap;
word-wrap: break-word;
word-break: normal;
}
code:not(pre code), pre {
display: inline-block;
background: $color-hover-black;
border: 1px solid $color-hover-black;
border-radius: $border-radius-small;
tab-size: 4;
font-variant-ligatures: none;
margin: calc($padding-small/4) 0;
}
p {
font-family: $font-family-global;
line-height: 1.4;
}
pre {
padding: $padding-small;
}
code {
font-size: $font-size-small;
padding: calc($padding-small / 4) calc($padding-small / 2);
}
a {
color: $color-primary;
cursor: pointer;
&:hover {
text-decoration: underline;
}
}
strong {
font-weight: bold;
}
em {
font-style: italic;
}
blockquote {
border-left: 4px solid $color-hover-black;
margin: calc($padding-small/2) $padding-small;
padding: calc($padding-small/2) $padding-small;
}
ul, ol {
list-style-position: inside;
}
/* end styles for markdown */
}
}

View file

@ -89,7 +89,7 @@ const PredefinedPanel: FC<PredefinedPanelsProps> = ({
<>
<div>
<span>Description:</span>
<div dangerouslySetInnerHTML={{ __html: marked.parse(description) }}/>
<div dangerouslySetInnerHTML={{ __html: marked(description) as string }}/>
</div>
<hr/>
</>

View file

@ -7,6 +7,7 @@ export type StorageKeys = "AUTOCOMPLETE"
| "DISABLED_DEFAULT_TIMEZONE"
| "THEME"
| "LOGS_LIMIT"
| "LOGS_MARKDOWN"
| "EXPLORE_METRICS_TIPS"
| "QUERY_HISTORY"
| "QUERY_FAVORITES"

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -135,3 +135,23 @@ groups:
summary: "Configuration reload failed for vmagent instance {{ $labels.instance }}"
description: "Configuration hot-reload failed for vmagent on instance {{ $labels.instance }}.
Check vmagent's logs for detailed error message."
- alert: StreamAggrFlushTimeout
expr: |
increase(vm_streamaggr_flush_timeouts_total[5m]) > 0
labels:
severity: warning
annotations:
summary: "Streaming aggregation at \"{{ $labels.job }}\" (instance {{ $labels.instance }}) can't be finished within the configured aggregation interval."
description: "Stream aggregation process can't keep up with the load and might produce incorrect aggregation results. Check logs for more details.
Possible solutions: increase aggregation interval; aggregate smaller number of series; reduce samples' ingestion rate to stream aggregation."
- alert: StreamAggrDedupFlushTimeout
expr: |
increase(vm_streamaggr_dedup_flush_timeouts_total[5m]) > 0
labels:
severity: warning
annotations:
summary: "Deduplication \"{{ $labels.job }}\" (instance {{ $labels.instance }}) can't be finished within configured deduplication interval."
description: "Deduplication process can't keep up with the load and might produce incorrect results. Check docs https://docs.victoriametrics.com/stream-aggregation/#deduplication and logs for more details.
Possible solutions: increase deduplication interval; deduplicate smaller number of series; reduce samples' ingestion rate."

View file

@ -40,7 +40,7 @@ groups:
Check vmalert's logs for detailed error message."
- alert: RecordingRulesNoData
expr: sum(vmalert_recording_rules_last_evaluation_samples) without(recording, id) < 1
expr: sum(vmalert_recording_rules_last_evaluation_samples) without(id) < 1
for: 30m
labels:
severity: info

View file

@ -1,4 +1,3 @@
version: '3.5'
services:
# Metrics collector.
# It scrapes targets defined in --promscrape.config

View file

@ -1,4 +1,3 @@
version: "3.5"
services:
# Grafana instance configured with VictoriaLogs as datasource
grafana:

View file

@ -1,4 +1,3 @@
version: "3.5"
services:
# Metrics collector.
# It scrapes targets defined in --promscrape.config

View file

@ -1,4 +1,3 @@
version: "3.5"
services:
grafana:
container_name: grafana

View file

@ -1,4 +1,3 @@
version: "3.5"
services:
grafana:
container_name: grafana

View file

@ -30,6 +30,16 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
## tip
**Update note 1: the `--vm-disable-progress-bar` command-line flag at `vmctl` was deprecated. Use `--disable-progress-bar` instead.**
* FEATURE: [alerts-vmagent](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vmagent.yml): add new alerting rules `StreamAggrFlushTimeout` and `StreamAggrDedupFlushTimeout` to notify about issues during stream aggregation.
* FEATURE: [dashboards/vmagent](https://grafana.com/grafana/dashboards/12683): add row `Streaming aggregation` with panels related to [streaming aggregation](https://docs.victoriametrics.com/stream-aggregation/) process.
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): add `idleConnTimeout` flag set to 50s by default. It should reduce the probability of `broken pipe` or `connection reset by peer` errors in vmauth logs.
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): add auto request retry for trivial network errors, such as `broken pipe` and `connection reset` for requests to the configured backends.
* BUGFIX: all VictoriaMetrics components: prioritize `-configAuthKey` and `-reloadAuthKey` over `-httpAuth.*` settings. This change aligns behavior of mentioned flags with other auth flags like `-metricsAuthKey`, `-flagsAuthKey`, `-pprofAuthKey`. Check [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6329).
* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl/): add `--disable-progress-bar` global command-line flag. It can be used for disabling dynamic progress bar for all migration modes. `--vm-disable-progress-bar` command-line flag is deprecated and will be removed in the future releases. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6367).
## [v1.102.0-rc1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.102.0-rc1)
Released at 2024-06-07

View file

@ -2765,7 +2765,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-cacheExpireDuration duration
Items are removed from in-memory caches after they aren't accessed for this duration. Lower values may reduce memory usage at the cost of higher CPU usage. See also -prevCacheRemovalPercent (default 30m0s)
-configAuthKey value
Authorization key for accessing /config page. It must be passed via authKey query arg
Authorization key for accessing /config page. It must be passed via authKey query arg. It overrides httpAuth.* settings.
Flag value can be read from the given file when using -configAuthKey=file:///abs/path/to/file or -configAuthKey=file://./relative/path/to/file . Flag value can be read from the given http/https url when using -configAuthKey=http://host/path or -configAuthKey=https://host/path
-csvTrimTimestamp duration
Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms)
@ -3079,7 +3079,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-relabelConfig string
Optional path to a file with relabeling rules, which are applied to all the ingested metrics. The path can point either to local file or to http url. See https://docs.victoriametrics.com/#relabeling for details. The config is reloaded on SIGHUP signal
-reloadAuthKey value
Auth key for /-/reload http endpoint. It must be passed as authKey=...
Auth key for /-/reload http endpoint. It must be passed via authKey query arg. It overrides httpAuth.* settings.
Flag value can be read from the given file when using -reloadAuthKey=file:///abs/path/to/file or -reloadAuthKey=file://./relative/path/to/file . Flag value can be read from the given http/https url when using -reloadAuthKey=http://host/path or -reloadAuthKey=https://host/path
-retentionFilter array
Retention filter in the format 'filter:retention'. For example, '{env="dev"}:3d' configures the retention for time series with env="dev" label to 3 days. See https://docs.victoriametrics.com/#retention-filters for details. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise/

View file

@ -2773,7 +2773,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-cacheExpireDuration duration
Items are removed from in-memory caches after they aren't accessed for this duration. Lower values may reduce memory usage at the cost of higher CPU usage. See also -prevCacheRemovalPercent (default 30m0s)
-configAuthKey value
Authorization key for accessing /config page. It must be passed via authKey query arg
Authorization key for accessing /config page. It must be passed via authKey query arg. It overrides httpAuth.* settings.
Flag value can be read from the given file when using -configAuthKey=file:///abs/path/to/file or -configAuthKey=file://./relative/path/to/file . Flag value can be read from the given http/https url when using -configAuthKey=http://host/path or -configAuthKey=https://host/path
-csvTrimTimestamp duration
Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms)
@ -3087,7 +3087,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-relabelConfig string
Optional path to a file with relabeling rules, which are applied to all the ingested metrics. The path can point either to local file or to http url. See https://docs.victoriametrics.com/#relabeling for details. The config is reloaded on SIGHUP signal
-reloadAuthKey value
Auth key for /-/reload http endpoint. It must be passed as authKey=...
Auth key for /-/reload http endpoint. It must be passed via authKey query arg. It overrides httpAuth.* settings.
Flag value can be read from the given file when using -reloadAuthKey=file:///abs/path/to/file or -reloadAuthKey=file://./relative/path/to/file . Flag value can be read from the given http/https url when using -reloadAuthKey=http://host/path or -reloadAuthKey=https://host/path
-retentionFilter array
Retention filter in the format 'filter:retention'. For example, '{env="dev"}:3d' configures the retention for time series with env="dev" label to 3 days. See https://docs.victoriametrics.com/#retention-filters for details. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise/

View file

@ -20,6 +20,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
* FEATURE: disallow unescaped `!` char in [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/) queries, since it permits writing incorrect query, which may look like correct one. For example, `foo!:bar` instead of `foo:!bar`.
* FEATURE: [web UI](https://docs.victoriametrics.com/VictoriaLogs/querying/#web-ui): add markdown support to the `Group` view. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6292).
## [v0.18.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.18.0-victorialogs)

View file

@ -9,6 +9,8 @@ menu:
title: Quick Start
aliases:
- /VictoriaLogs/QuickStart.html
- /victorialogs/quick-start.html
- /victorialogs/quick-start/
---
# VictoriaLogs Quick Start

View file

@ -1637,7 +1637,7 @@ See the docs at https://docs.victoriametrics.com/vmagent/ .
-cacheExpireDuration duration
Items are removed from in-memory caches after they aren't accessed for this duration. Lower values may reduce memory usage at the cost of higher CPU usage. See also -prevCacheRemovalPercent (default 30m0s)
-configAuthKey value
Authorization key for accessing /config page. It must be passed via authKey query arg
Authorization key for accessing /config page. It must be passed via authKey query arg. It overrides httpAuth.* settings.
Flag value can be read from the given file when using -configAuthKey=file:///abs/path/to/file or -configAuthKey=file://./relative/path/to/file . Flag value can be read from the given http/https url when using -configAuthKey=http://host/path or -configAuthKey=https://host/path
-csvTrimTimestamp duration
Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms)
@ -2006,7 +2006,7 @@ See the docs at https://docs.victoriametrics.com/vmagent/ .
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-reloadAuthKey value
Auth key for /-/reload http endpoint. It must be passed as authKey=...
Auth key for /-/reload http endpoint. It must be passed via authKey query arg. It overrides httpAuth.* settings.
Flag value can be read from the given file when using -reloadAuthKey=file:///abs/path/to/file or -reloadAuthKey=file://./relative/path/to/file . Flag value can be read from the given http/https url when using -reloadAuthKey=http://host/path or -reloadAuthKey=https://host/path
-remoteWrite.aws.accessKey array
Optional AWS AccessKey to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set

View file

@ -6,6 +6,8 @@ menu:
parent: 'victoriametrics'
weight: 12
title: vmalert-tool
aliases:
- /vmalert-tool.html
---
# vmalert-tool

View file

@ -1263,7 +1263,7 @@ The shortlist of configuration flags is the following:
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-reloadAuthKey value
Auth key for /-/reload http endpoint. It must be passed as authKey=...
Auth key for /-/reload http endpoint. It must be passed via authKey query arg. It overrides httpAuth.* settings.
Flag value can be read from the given file when using -reloadAuthKey=file:///abs/path/to/file or -reloadAuthKey=file://./relative/path/to/file . Flag value can be read from the given http/https url when using -reloadAuthKey=http://host/path or -reloadAuthKey=https://host/path
-remoteRead.basicAuth.password string
Optional basic auth password for -remoteRead.url

View file

@ -1206,6 +1206,8 @@ See the docs at https://docs.victoriametrics.com/vmauth/ .
Whether to use proxy protocol for connections accepted at the corresponding -httpListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt . With enabled proxy protocol http server cannot serve regular /metrics endpoint. Use -pushmetrics.url for metrics pushing
Supports array of values separated by comma or specified via multiple flags.
Empty values are set to false.
-idleConnTimeout duration
Defines a duration for idle (keep-alive connections) to exist. Consider setting this value less than "-http.idleConnTimeout". It must prevent possible "write: broken pipe" and "read: connection reset by peer" errors. (default 50s)
-internStringCacheExpireDuration duration
The expiry duration for caches for interned strings. See https://en.wikipedia.org/wiki/String_interning . See also -internStringMaxLen and -internStringDisableCache (default 6m0s)
-internStringDisableCache
@ -1287,7 +1289,7 @@ See the docs at https://docs.victoriametrics.com/vmauth/ .
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-reloadAuthKey value
Auth key for /-/reload http endpoint. It must be passed as authKey=...
Auth key for /-/reload http endpoint. It must be passed via authKey query arg. It overrides httpAuth.* settings.
Flag value can be read from the given file when using -reloadAuthKey=file:///abs/path/to/file or -reloadAuthKey=file://./relative/path/to/file . Flag value can be read from the given http/https url when using -reloadAuthKey=http://host/path or -reloadAuthKey=https://host/path
-responseTimeout duration
The timeout for receiving a response from backend (default 5m0s)

View file

@ -19,6 +19,121 @@ var (
"See https://en.wikipedia.org/wiki/String_interning . See also -internStringMaxLen and -internStringDisableCache")
)
type internStringMap struct {
mu sync.Mutex
mutable map[string]string
mutableReads uint64
readonly atomic.Pointer[map[string]internStringMapEntry]
nextCleanupTime atomic.Uint64
}
type internStringMapEntry struct {
deadline uint64
s string
}
func newInternStringMap() *internStringMap {
ism := &internStringMap{
mutable: make(map[string]string),
}
readonly := make(map[string]internStringMapEntry)
ism.readonly.Store(&readonly)
ism.nextCleanupTime.Store(fasttime.UnixTimestamp() + 61)
return ism
}
func (m *internStringMap) getReadonly() map[string]internStringMapEntry {
return *m.readonly.Load()
}
func (m *internStringMap) intern(s string) string {
if *disableCache || len(s) > *internStringMaxLen {
return strings.Clone(s)
}
currentTime := fasttime.UnixTimestamp()
if currentTime >= m.nextCleanupTime.Load() {
m.nextCleanupTime.Store(currentTime + 61)
m.cleanup()
}
readonly := m.getReadonly()
e, ok := readonly[s]
if ok {
// Fast path - the string has been found in readonly map
return e.s
}
// Slower path - search for the string in mutable map
m.mu.Lock()
sInterned, ok := m.mutable[s]
if !ok {
// Verify whether the s has been already registered by concurrent goroutines in m.readonly
readonly = m.getReadonly()
e, ok = readonly[s]
if !ok {
// Slowest path - register the string in mutable map.
// Make a new copy for s in order to remove references from possible bigger string s refers to.
sInterned = strings.Clone(s)
m.mutable[sInterned] = sInterned
} else {
sInterned = e.s
}
}
m.mutableReads++
if m.mutableReads > uint64(len(readonly)) {
m.migrateMutableToReadonlyLocked()
m.mutableReads = 0
}
m.mu.Unlock()
return sInterned
}
func (m *internStringMap) migrateMutableToReadonlyLocked() {
readonly := m.getReadonly()
readonlyCopy := make(map[string]internStringMapEntry, len(readonly)+len(m.mutable))
for k, e := range readonly {
readonlyCopy[k] = e
}
deadline := fasttime.UnixTimestamp() + uint64(cacheExpireDuration.Seconds()+0.5)
for k, s := range m.mutable {
readonlyCopy[k] = internStringMapEntry{
s: s,
deadline: deadline,
}
}
m.mutable = make(map[string]string)
m.readonly.Store(&readonlyCopy)
}
func (m *internStringMap) cleanup() {
m.mu.Lock()
defer m.mu.Unlock()
readonly := m.getReadonly()
currentTime := fasttime.UnixTimestamp()
needCleanup := false
for _, e := range readonly {
if e.deadline <= currentTime {
needCleanup = true
break
}
}
if !needCleanup {
return
}
readonlyCopy := make(map[string]internStringMapEntry, len(readonly))
for k, e := range readonly {
if e.deadline > currentTime {
readonlyCopy[k] = e
}
}
m.readonly.Store(&readonlyCopy)
}
func isSkipCache(s string) bool {
return *disableCache || len(s) > *internStringMaxLen
}
@ -33,52 +148,7 @@ func InternBytes(b []byte) string {
//
// This may be needed for reducing the amounts of allocated memory.
func InternString(s string) string {
if isSkipCache(s) {
// Make a new copy for s in order to remove references from possible bigger string s refers to.
// This also protects from cases when s points to unsafe string - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3227
return strings.Clone(s)
}
ct := fasttime.UnixTimestamp()
if v, ok := internStringsMap.Load(s); ok {
e := v.(*ismEntry)
if e.lastAccessTime.Load()+10 < ct {
// Reduce the frequency of e.lastAccessTime update to once per 10 seconds
// in order to improve the fast path speed on systems with many CPU cores.
e.lastAccessTime.Store(ct)
}
return e.s
}
// Make a new copy for s in order to remove references from possible bigger string s refers to.
sCopy := strings.Clone(s)
e := &ismEntry{
s: sCopy,
}
e.lastAccessTime.Store(ct)
internStringsMap.Store(sCopy, e)
if needCleanup(&internStringsMapLastCleanupTime, ct) {
// Perform a global cleanup for internStringsMap by removing items, which weren't accessed during the last 5 minutes.
m := &internStringsMap
deadline := ct - uint64(cacheExpireDuration.Seconds())
m.Range(func(k, v interface{}) bool {
e := v.(*ismEntry)
if e.lastAccessTime.Load() < deadline {
m.Delete(k)
}
return true
})
}
return sCopy
return ism.intern(s)
}
type ismEntry struct {
lastAccessTime atomic.Uint64
s string
}
var (
internStringsMap sync.Map
internStringsMapLastCleanupTime atomic.Uint64
)
var ism = newInternStringMap()

View file

@ -396,6 +396,18 @@ func handlerWrapper(s *server, w http.ResponseWriter, r *http.Request, rh Reques
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4128
fmt.Fprintf(w, "User-agent: *\nDisallow: /\n")
return
case "/config", "/-/reload":
// only some components (vmagent, vmalert, etc.) support these handlers
// these components are responsible for CheckAuthFlag call
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6329
w = &responseWriterWithAbort{
ResponseWriter: w,
}
if !rh(w, r) {
Errorf(w, r, "unsupported path requested: %q", r.URL.Path)
unsupportedRequestErrors.Inc()
}
return
default:
if strings.HasPrefix(r.URL.Path, "/debug/pprof/") {
pprofRequests.Inc()

View file

@ -2,6 +2,7 @@ package firehose
import (
"fmt"
"html"
"net/http"
"time"
)
@ -12,11 +13,12 @@ import (
func WriteSuccessResponse(w http.ResponseWriter, r *http.Request) {
requestID := r.Header.Get("X-Amz-Firehose-Request-Id")
if requestID == "" {
// This isn't a AWS firehose request - just return an empty response in this case.
// This isn't an AWS firehose request - just return an empty response in this case.
w.WriteHeader(http.StatusOK)
return
}
requestID = html.EscapeString(requestID)
body := fmt.Sprintf(`{"requestId":%q,"timestamp":%d}`, requestID, time.Now().UnixMilli())
h := w.Header()

View file

@ -2,10 +2,13 @@ package streamaggr
import (
"sync"
"sync/atomic"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
const dedupAggrShardsCount = 128
@ -24,7 +27,12 @@ type dedupAggrShard struct {
type dedupAggrShardNopad struct {
mu sync.Mutex
m map[string]dedupAggrSample
m map[string]*dedupAggrSample
samplesBuf []dedupAggrSample
sizeBytes atomic.Uint64
itemsCount atomic.Uint64
}
type dedupAggrSample struct {
@ -42,7 +50,7 @@ func newDedupAggr() *dedupAggr {
func (da *dedupAggr) sizeBytes() uint64 {
n := uint64(unsafe.Sizeof(*da))
for i := range da.shards {
n += da.shards[i].sizeBytes()
n += da.shards[i].sizeBytes.Load()
}
return n
}
@ -50,28 +58,11 @@ func (da *dedupAggr) sizeBytes() uint64 {
func (da *dedupAggr) itemsCount() uint64 {
n := uint64(0)
for i := range da.shards {
n += da.shards[i].itemsCount()
n += da.shards[i].itemsCount.Load()
}
return n
}
func (das *dedupAggrShard) sizeBytes() uint64 {
das.mu.Lock()
n := uint64(unsafe.Sizeof(*das))
for k, s := range das.m {
n += uint64(len(k)) + uint64(unsafe.Sizeof(k)+unsafe.Sizeof(s))
}
das.mu.Unlock()
return n
}
func (das *dedupAggrShard) itemsCount() uint64 {
das.mu.Lock()
n := uint64(len(das.m))
das.mu.Unlock()
return n
}
func (da *dedupAggr) pushSamples(samples []pushSample) {
pss := getPerShardSamples()
shards := pss.shards
@ -113,7 +104,7 @@ func (ctx *dedupFlushCtx) reset() {
ctx.samples = ctx.samples[:0]
}
func (da *dedupAggr) flush(f func(samples []pushSample), resetState bool) {
func (da *dedupAggr) flush(f func(samples []pushSample)) {
var wg sync.WaitGroup
for i := range da.shards {
flushConcurrencyCh <- struct{}{}
@ -125,7 +116,7 @@ func (da *dedupAggr) flush(f func(samples []pushSample), resetState bool) {
}()
ctx := getDedupFlushCtx()
shard.flush(ctx, f, resetState)
shard.flush(ctx, f)
putDedupFlushCtx(ctx)
}(&da.shards[i])
}
@ -169,36 +160,43 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
m := das.m
if m == nil {
m = make(map[string]dedupAggrSample, len(samples))
m = make(map[string]*dedupAggrSample, len(samples))
das.m = m
}
samplesBuf := das.samplesBuf
for _, sample := range samples {
s, ok := m[sample.key]
if !ok {
samplesBuf = slicesutil.SetLength(samplesBuf, len(samplesBuf)+1)
s = &samplesBuf[len(samplesBuf)-1]
s.value = sample.value
s.timestamp = sample.timestamp
key := bytesutil.InternString(sample.key)
m[key] = dedupAggrSample{
value: sample.value,
timestamp: sample.timestamp,
}
m[key] = s
das.itemsCount.Add(1)
das.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)+unsafe.Sizeof(*s)))
continue
}
// Update the existing value according to logic described at https://docs.victoriametrics.com/#deduplication
if sample.timestamp > s.timestamp || (sample.timestamp == s.timestamp && sample.value > s.value) {
key := bytesutil.InternString(sample.key)
m[key] = dedupAggrSample{
value: sample.value,
timestamp: sample.timestamp,
}
s.value = sample.value
s.timestamp = sample.timestamp
}
}
das.samplesBuf = samplesBuf
}
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample), resetState bool) {
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample)) {
das.mu.Lock()
m := das.m
if resetState && len(m) > 0 {
das.m = make(map[string]dedupAggrSample, len(m))
if len(m) > 0 {
das.m = make(map[string]*dedupAggrSample, len(m))
das.sizeBytes.Store(0)
das.itemsCount.Store(0)
das.samplesBuf = das.samplesBuf[:0]
}
das.mu.Unlock()
@ -216,7 +214,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample
})
// Limit the number of samples per each flush in order to limit memory usage.
if len(dstSamples) >= 100_000 {
if len(dstSamples) >= 10_000 {
f(dstSamples)
clear(dstSamples)
dstSamples = dstSamples[:0]

View file

@ -23,8 +23,8 @@ func TestDedupAggrSerial(t *testing.T) {
da.pushSamples(samples)
}
if n := da.sizeBytes(); n > 4_200_000 {
t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 4_200_000 bytes", n)
if n := da.sizeBytes(); n > 5_000_000 {
t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 5_000_000 bytes", n)
}
if n := da.itemsCount(); n != seriesCount {
t.Fatalf("unexpected itemsCount; got %d; want %d", n, seriesCount)
@ -39,7 +39,7 @@ func TestDedupAggrSerial(t *testing.T) {
}
mu.Unlock()
}
da.flush(flushSamples, true)
da.flush(flushSamples)
if !reflect.DeepEqual(expectedSamplesMap, flushedSamplesMap) {
t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", flushedSamplesMap, expectedSamplesMap)

View file

@ -4,7 +4,6 @@ import (
"fmt"
"sync/atomic"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
@ -17,22 +16,8 @@ func BenchmarkDedupAggr(b *testing.B) {
}
}
func BenchmarkDedupAggrFlushSerial(b *testing.B) {
as := newTotalAggrState(time.Hour, true, true)
benchSamples := newBenchSamples(100_000)
da := newDedupAggr()
da.pushSamples(benchSamples)
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(benchSamples)))
for i := 0; i < b.N; i++ {
da.flush(as.pushSamples, false)
}
}
func benchmarkDedupAggr(b *testing.B, samplesPerPush int) {
const loops = 100
const loops = 2
benchSamples := newBenchSamples(samplesPerPush)
da := newDedupAggr()

View file

@ -166,7 +166,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) {
ctx.labels = labels
ctx.samples = samples
putDeduplicatorFlushCtx(ctx)
}, true)
})
duration := time.Since(startTime)
d.dedupFlushDuration.Update(duration.Seconds())

View file

@ -747,7 +747,7 @@ func (a *aggregator) dedupFlush(dedupInterval time.Duration) {
startTime := time.Now()
a.da.flush(a.pushSamples, true)
a.da.flush(a.pushSamples)
d := time.Since(startTime)
a.dedupFlushDuration.Update(d.Seconds())