diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 4662926ee..d0291479d 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -19,11 +19,9 @@ jobs: go-version: 1.15 id: go - name: Dependencies - env: - GO111MODULE: off run: | - go install golang.org/x/lint/golint - go install github.com/kisielk/errcheck + go get -u golang.org/x/lint/golint + go get -u github.com/kisielk/errcheck curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.29.0 - name: Code checkout uses: actions/checkout@master diff --git a/Makefile b/Makefile index b20c241fa..53307e25a 100644 --- a/Makefile +++ b/Makefile @@ -80,7 +80,7 @@ lint: install-golint golint app/... install-golint: - which golint || GO111MODULE=off go install golang.org/x/lint/golint + which golint || go install golang.org/x/lint/golint errcheck: install-errcheck errcheck -exclude=errcheck_excludes.txt ./lib/... @@ -94,7 +94,7 @@ errcheck: install-errcheck errcheck -exclude=errcheck_excludes.txt ./app/vmrestore/... install-errcheck: - which errcheck || GO111MODULE=off go install github.com/kisielk/errcheck + which errcheck || go install github.com/kisielk/errcheck check-all: fmt vet lint errcheck golangci-lint @@ -140,7 +140,7 @@ quicktemplate-gen: install-qtc qtc install-qtc: - which qtc || GO111MODULE=off go install github.com/valyala/quicktemplate/qtc + which qtc || go install github.com/valyala/quicktemplate/qtc golangci-lint: install-golangci-lint diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index a169f774b..2ffdfab12 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -4,7 +4,6 @@ import ( "bytes" "crypto/tls" "encoding/base64" - "flag" "fmt" "io/ioutil" "net/http" @@ -21,11 +20,11 @@ import ( ) var ( - sendTimeout = flag.Duration("remoteWrite.sendTimeout", time.Minute, "Timeout for sending a single block of data to -remoteWrite.url") + sendTimeout = flagutil.NewArrayDuration("remoteWrite.sendTimeout", "Timeout for sending a single block of data to -remoteWrite.url") proxyURL = flagutil.NewArray("remoteWrite.proxyURL", "Optional proxy URL for writing data to -remoteWrite.url. Supported proxies: http, https, socks5. "+ "Example: -remoteWrite.proxyURL=socks5://proxy:1234") - tlsInsecureSkipVerify = flag.Bool("remoteWrite.tlsInsecureSkipVerify", false, "Whether to skip tls verification when connecting to -remoteWrite.url") + tlsInsecureSkipVerify = flagutil.NewArrayBool("remoteWrite.tlsInsecureSkipVerify", "Whether to skip tls verification when connecting to -remoteWrite.url") tlsCertFile = flagutil.NewArray("remoteWrite.tlsCertFile", "Optional path to client-side TLS certificate file to use when connecting to -remoteWrite.url. "+ "If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url") tlsKeyFile = flagutil.NewArray("remoteWrite.tlsKeyFile", "Optional path to client-side TLS certificate key to use when connecting to -remoteWrite.url. "+ @@ -108,7 +107,7 @@ func newClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqu fq: fq, hc: &http.Client{ Transport: tr, - Timeout: *sendTimeout, + Timeout: sendTimeout.GetOptionalArgOrDefault(argIdx, time.Minute), }, stopCh: make(chan struct{}), } @@ -140,7 +139,7 @@ func getTLSConfig(argIdx int) (*tls.Config, error) { CertFile: tlsCertFile.GetOptionalArg(argIdx), KeyFile: tlsKeyFile.GetOptionalArg(argIdx), ServerName: tlsServerName.GetOptionalArg(argIdx), - InsecureSkipVerify: *tlsInsecureSkipVerify, + InsecureSkipVerify: tlsInsecureSkipVerify.GetOptionalArg(argIdx), } if c.CAFile == "" && c.CertFile == "" && c.KeyFile == "" && c.ServerName == "" && !c.InsecureSkipVerify { return nil, nil diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go index 3bb5f8463..cedcf1958 100644 --- a/app/vmselect/promql/rollup.go +++ b/app/vmselect/promql/rollup.go @@ -1308,8 +1308,11 @@ func rollupDelta(rfa *rollupFuncArg) float64 { // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/894 return values[len(values)-1] - rfa.realPrevValue } - // Assume that the previous non-existing value was 0 - // only if the first value doesn't exceed too much the delta with the next value. + // Assume that the previous non-existing value was 0 only in the following cases: + // + // - If the delta with the next value equals to 0. + // This is the case for slow-changing counter - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/962 + // - If the first value doesn't exceed too much the delta with the next value. // // This should prevent from improper increase() results for os-level counters // such as cpu time or bytes sent over the network interface. @@ -1317,12 +1320,15 @@ func rollupDelta(rfa *rollupFuncArg) float64 { // // This also should prevent from improper increase() results when a part of label values are changed // without counter reset. - d := float64(10) + var d float64 if len(values) > 1 { d = values[1] - values[0] } else if !math.IsNaN(rfa.realNextValue) { d = rfa.realNextValue - values[0] } + if d == 0 { + d = 10 + } if math.Abs(values[0]) < 10*(math.Abs(d)+1) { prevValue = 0 } else { diff --git a/app/vmselect/promql/rollup_test.go b/app/vmselect/promql/rollup_test.go index ad9a7b7e4..b736d6259 100644 --- a/app/vmselect/promql/rollup_test.go +++ b/app/vmselect/promql/rollup_test.go @@ -1171,8 +1171,16 @@ func TestRollupDelta(t *testing.T) { f(nan, nan, nan, []float64{5, 6, 8}, 8) f(2, nan, nan, []float64{5, 6, 8}, 6) - // Too big initial value must be skipped. + // Moderate initial value with zero delta after that. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/962 + f(nan, nan, nan, []float64{100}, 100) + f(nan, nan, nan, []float64{100, 100}, 100) + + // Big initial value with with zero delta after that. f(nan, nan, nan, []float64{1000}, 0) + f(nan, nan, nan, []float64{1000, 1000}, 0) + + // Big initial value with small delta after that. f(nan, nan, nan, []float64{1000, 1001, 1002}, 2) // Non-nan realPrevValue diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index c24f4458e..b8768928f 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -11,6 +11,7 @@ * FEATURE: export `vm_promscrape_active_scrapers{type=""}` metric for tracking the number of active scrapers per each service discovery type. * FEATURE: export `vm_promscrape_scrapers_started_total{type=""}` and `vm_promscrape_scrapers_stopped_total{type=""}` metrics for tracking churn rate for scrapers per each service discovery type. +* FEATURE: vmagent: allow setting per-`-remoteWrite.url` command-line flags for `-remoteWrite.sendTimeout` and `-remoteWrite.tlsInsecureSkipVerify`. * BUGFIX: properly handle `*` and `[...]` inside curly braces in query passed to Graphite Metrics API. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/952 * BUGFIX: vmagent: fix memory leak when big number of targets is discovered via service discovery. @@ -20,6 +21,8 @@ * BUGFIX: do not enable strict parsing for `-promscrape.config` if `-promscrape.config.dryRun` comand-line flag is set. Strict parsing can be enabled with `-promscrape.config.strictParse` command-line flag. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/944 * BUGFIX: vminsert: properly update `vm_rpc_rerouted_rows_processed_total` metric. Previously it wasn't updated. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/955 * BUGFIX: vmagent: properly recover when opening incorrectly stored persistent queue. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/964 +* BUGFIX: vmagent: properly handle scrape errors when stream parsing is enabled with `-promscrape.streamParse` command-line flag or with `stream_parse: true` per-target config option. Previously such errors weren't reported at `/targets` page. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/967 +* BUGFIX: assume the previous value is 0 when calculating `increase()` for the first point on the graph if its value doesn't exceed 100 and the delta between two first points equals to 0. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/962 # [v1.49.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.49.0) diff --git a/lib/flagutil/array.go b/lib/flagutil/array.go index d4454795f..51f7d2f5b 100644 --- a/lib/flagutil/array.go +++ b/lib/flagutil/array.go @@ -5,6 +5,7 @@ import ( "fmt" "strconv" "strings" + "time" ) // NewArray returns new Array with the given name and description. @@ -16,6 +17,24 @@ func NewArray(name, description string) *Array { return &a } +// NewArrayDuration returns new ArrayDuration with the given name and description. +func NewArrayDuration(name, description string) *ArrayDuration { + description += "\nSupports `array` of values separated by comma" + + " or specified via multiple flags." + var a ArrayDuration + flag.Var(&a, name, description) + return &a +} + +// NewArrayBool returns new ArrayBool with the given name and description. +func NewArrayBool(name, description string) *ArrayBool { + description += "\nSupports `array` of values separated by comma" + + " or specified via multiple flags." + var a ArrayBool + flag.Var(&a, name, description) + return &a +} + // Array is a flag that holds an array of values. // // It may be set either by specifying multiple flags with the given name @@ -124,3 +143,83 @@ func (a *Array) GetOptionalArg(argIdx int) string { } return x[argIdx] } + +// ArrayBool is a flag that holds an array of booleans values. +// have the same api as Array. +type ArrayBool []bool + +// IsBoolFlag implements flag.IsBoolFlag interface +func (a *ArrayBool) IsBoolFlag() bool { return true } + +// String implements flag.Value interface +func (a *ArrayBool) String() string { + formattedBools := make([]string, len(*a)) + for i, v := range *a { + formattedBools[i] = strconv.FormatBool(v) + } + return strings.Join(formattedBools, ",") +} + +// Set implements flag.Value interface +func (a *ArrayBool) Set(value string) error { + values := parseArrayValues(value) + for _, v := range values { + b, err := strconv.ParseBool(v) + if err != nil { + return err + } + *a = append(*a, b) + } + return nil +} + +// GetOptionalArg returns optional arg under the given argIdx. +func (a *ArrayBool) GetOptionalArg(argIdx int) bool { + x := *a + if argIdx >= len(x) { + if len(x) == 1 { + return x[0] + } + return false + } + return x[argIdx] +} + +// ArrayDuration is a flag that holds an array of time.Duration values. +// have the same api as Array. +type ArrayDuration []time.Duration + +// String implements flag.Value interface +func (a *ArrayDuration) String() string { + formattedBools := make([]string, len(*a)) + for i, v := range *a { + formattedBools[i] = v.String() + } + return strings.Join(formattedBools, ",") +} + +// Set implements flag.Value interface +func (a *ArrayDuration) Set(value string) error { + values := parseArrayValues(value) + for _, v := range values { + b, err := time.ParseDuration(v) + if err != nil { + return err + } + *a = append(*a, b) + } + return nil +} + +// GetOptionalArgOrDefault returns optional arg under the given argIdx, +// or default value, if argIdx not found. +func (a *ArrayDuration) GetOptionalArgOrDefault(argIdx int, defaultValue time.Duration) time.Duration { + x := *a + if argIdx >= len(x) { + if len(x) == 1 { + return x[0] + } + return defaultValue + } + return x[argIdx] +} diff --git a/lib/flagutil/array_test.go b/lib/flagutil/array_test.go index a29e59e17..7e27ab2ff 100644 --- a/lib/flagutil/array_test.go +++ b/lib/flagutil/array_test.go @@ -5,13 +5,21 @@ import ( "os" "reflect" "testing" + "time" ) -var fooFlag Array +var ( + fooFlag Array + fooFlagDuration ArrayDuration + fooFlagBool ArrayBool +) func init() { - os.Args = append(os.Args, "--fooFlag=foo", "--fooFlag=bar") + os.Args = append(os.Args, "--fooFlag=foo", "--fooFlag=bar", "--fooFlagDuration=10s", "--fooFlagDuration=5m") + os.Args = append(os.Args, "--fooFlagBool=true", "--fooFlagBool=false,true", "--fooFlagBool") flag.Var(&fooFlag, "fooFlag", "test") + flag.Var(&fooFlagDuration, "fooFlagDuration", "test") + flag.Var(&fooFlagBool, "fooFlagBool", "test") } func TestMain(m *testing.M) { @@ -91,3 +99,123 @@ func TestArrayString(t *testing.T) { f(`", foo","b\"ar",`) f(`,"\nfoo\\",bar`) } + +func TestArrayDuration(t *testing.T) { + expected := map[time.Duration]struct{}{ + time.Second * 10: {}, + time.Minute * 5: {}, + } + if len(expected) != len(fooFlagDuration) { + t.Errorf("len array flag (%d) is not equal to %d", len(fooFlag), len(expected)) + } + for _, i := range fooFlagDuration { + if _, ok := expected[i]; !ok { + t.Errorf("unexpected item in array %v", i) + } + } +} + +func TestArrayDurationSet(t *testing.T) { + f := func(s string, expectedValues []time.Duration) { + t.Helper() + var a ArrayDuration + _ = a.Set(s) + if !reflect.DeepEqual([]time.Duration(a), expectedValues) { + t.Fatalf("unexpected values parsed;\ngot\n%q\nwant\n%q", a, expectedValues) + } + } + f("", nil) + f(`1m`, []time.Duration{time.Minute}) + f(`5m,1s,1h`, []time.Duration{time.Minute * 5, time.Second, time.Hour}) +} + +func TestArrayDurationGetOptionalArg(t *testing.T) { + f := func(s string, argIdx int, expectedValue time.Duration, defaultValue time.Duration) { + t.Helper() + var a ArrayDuration + _ = a.Set(s) + v := a.GetOptionalArgOrDefault(argIdx, defaultValue) + if v != expectedValue { + t.Fatalf("unexpected value; got %q; want %q", v, expectedValue) + } + } + f("", 0, time.Second, time.Second) + f("", 1, time.Minute, time.Minute) + f("10s,1m", 1, time.Minute, time.Minute) + f("10s", 3, time.Second*10, time.Minute) +} + +func TestArrayDurationString(t *testing.T) { + f := func(s string) { + t.Helper() + var a ArrayDuration + _ = a.Set(s) + result := a.String() + if result != s { + t.Fatalf("unexpected string;\ngot\n%s\nwant\n%s", result, s) + } + } + f("") + f("10s,1m0s") + f("5m0s,1s") +} + +func TestArrayBool(t *testing.T) { + expected := []bool{ + true, false, true, true, + } + if len(expected) != len(fooFlagBool) { + t.Errorf("len array flag (%d) is not equal to %d", len(fooFlag), len(expected)) + } + for i, v := range fooFlagBool { + if v != expected[i] { + t.Errorf("unexpected item in array index=%v,value=%v,want=%v", i, v, expected[i]) + } + } +} + +func TestArrayBoolSet(t *testing.T) { + f := func(s string, expectedValues []bool) { + t.Helper() + var a ArrayBool + _ = a.Set(s) + if !reflect.DeepEqual([]bool(a), expectedValues) { + t.Fatalf("unexpected values parsed;\ngot\n%v\nwant\n%v", a, expectedValues) + } + } + f("", nil) + f(`true`, []bool{true}) + f(`false,True,False`, []bool{false, true, false}) +} + +func TestArrayBoolGetOptionalArg(t *testing.T) { + f := func(s string, argIdx int, expectedValue bool) { + t.Helper() + var a ArrayBool + _ = a.Set(s) + v := a.GetOptionalArg(argIdx) + if v != expectedValue { + t.Fatalf("unexpected value; got %v; want %v", v, expectedValue) + } + } + f("", 0, false) + f("", 1, false) + f("true,true,false", 1, true) + f("true", 2, true) +} + +func TestArrayBoolString(t *testing.T) { + f := func(s string) { + t.Helper() + var a ArrayBool + _ = a.Set(s) + result := a.String() + if result != s { + t.Fatalf("unexpected string;\ngot\n%s\nwant\n%s", result, s) + } + } + f("") + f("true") + f("true,false") + f("false,true") +} diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 58392cd93..e1ea2958f 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -313,38 +313,43 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error } func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { - sr, err := sw.GetStreamReader() - if err != nil { - return fmt.Errorf("cannot read data: %s", err) - } samplesScraped := 0 samplesPostRelabeling := 0 + responseSize := int64(0) wc := writeRequestCtxPool.Get(sw.prevRowsLen) - var mu sync.Mutex - err = parser.ParseStream(sr, scrapeTimestamp, false, func(rows []parser.Row) error { - mu.Lock() - defer mu.Unlock() - samplesScraped += len(rows) - for i := range rows { - sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true) - } - // Push the collected rows to sw before returning from the callback, since they cannot be held - // after returning from the callback - this will result in data race. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247 - samplesPostRelabeling += len(wc.writeRequest.Timeseries) - sw.updateSeriesAdded(wc) - startTime := time.Now() - sw.PushData(&wc.writeRequest) - pushDataDuration.UpdateDuration(startTime) - wc.resetNoRows() - return nil - }) + + sr, err := sw.GetStreamReader() + if err != nil { + err = fmt.Errorf("cannot read data: %s", err) + } else { + var mu sync.Mutex + err = parser.ParseStream(sr, scrapeTimestamp, false, func(rows []parser.Row) error { + mu.Lock() + defer mu.Unlock() + samplesScraped += len(rows) + for i := range rows { + sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true) + } + // Push the collected rows to sw before returning from the callback, since they cannot be held + // after returning from the callback - this will result in data race. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247 + samplesPostRelabeling += len(wc.writeRequest.Timeseries) + sw.updateSeriesAdded(wc) + startTime := time.Now() + sw.PushData(&wc.writeRequest) + pushDataDuration.UpdateDuration(startTime) + wc.resetNoRows() + return nil + }) + responseSize = sr.bytesRead + sr.MustClose() + } + scrapedSamples.Update(float64(samplesScraped)) endTimestamp := time.Now().UnixNano() / 1e6 duration := float64(endTimestamp-realTimestamp) / 1e3 scrapeDuration.Update(duration) - scrapeResponseSize.Update(float64(sr.bytesRead)) - sr.MustClose() + scrapeResponseSize.Update(float64(responseSize)) up := 1 if err != nil { if samplesScraped == 0 {