mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-11 14:53:49 +00:00
Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files
This commit is contained in:
commit
56054f4eb7
9 changed files with 289 additions and 43 deletions
6
.github/workflows/main.yml
vendored
6
.github/workflows/main.yml
vendored
|
@ -19,11 +19,9 @@ jobs:
|
||||||
go-version: 1.15
|
go-version: 1.15
|
||||||
id: go
|
id: go
|
||||||
- name: Dependencies
|
- name: Dependencies
|
||||||
env:
|
|
||||||
GO111MODULE: off
|
|
||||||
run: |
|
run: |
|
||||||
go install golang.org/x/lint/golint
|
go get -u golang.org/x/lint/golint
|
||||||
go install github.com/kisielk/errcheck
|
go get -u github.com/kisielk/errcheck
|
||||||
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.29.0
|
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.29.0
|
||||||
- name: Code checkout
|
- name: Code checkout
|
||||||
uses: actions/checkout@master
|
uses: actions/checkout@master
|
||||||
|
|
6
Makefile
6
Makefile
|
@ -80,7 +80,7 @@ lint: install-golint
|
||||||
golint app/...
|
golint app/...
|
||||||
|
|
||||||
install-golint:
|
install-golint:
|
||||||
which golint || GO111MODULE=off go install golang.org/x/lint/golint
|
which golint || go install golang.org/x/lint/golint
|
||||||
|
|
||||||
errcheck: install-errcheck
|
errcheck: install-errcheck
|
||||||
errcheck -exclude=errcheck_excludes.txt ./lib/...
|
errcheck -exclude=errcheck_excludes.txt ./lib/...
|
||||||
|
@ -94,7 +94,7 @@ errcheck: install-errcheck
|
||||||
errcheck -exclude=errcheck_excludes.txt ./app/vmrestore/...
|
errcheck -exclude=errcheck_excludes.txt ./app/vmrestore/...
|
||||||
|
|
||||||
install-errcheck:
|
install-errcheck:
|
||||||
which errcheck || GO111MODULE=off go install github.com/kisielk/errcheck
|
which errcheck || go install github.com/kisielk/errcheck
|
||||||
|
|
||||||
check-all: fmt vet lint errcheck golangci-lint
|
check-all: fmt vet lint errcheck golangci-lint
|
||||||
|
|
||||||
|
@ -140,7 +140,7 @@ quicktemplate-gen: install-qtc
|
||||||
qtc
|
qtc
|
||||||
|
|
||||||
install-qtc:
|
install-qtc:
|
||||||
which qtc || GO111MODULE=off go install github.com/valyala/quicktemplate/qtc
|
which qtc || go install github.com/valyala/quicktemplate/qtc
|
||||||
|
|
||||||
|
|
||||||
golangci-lint: install-golangci-lint
|
golangci-lint: install-golangci-lint
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"flag"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -21,11 +20,11 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
sendTimeout = flag.Duration("remoteWrite.sendTimeout", time.Minute, "Timeout for sending a single block of data to -remoteWrite.url")
|
sendTimeout = flagutil.NewArrayDuration("remoteWrite.sendTimeout", "Timeout for sending a single block of data to -remoteWrite.url")
|
||||||
proxyURL = flagutil.NewArray("remoteWrite.proxyURL", "Optional proxy URL for writing data to -remoteWrite.url. Supported proxies: http, https, socks5. "+
|
proxyURL = flagutil.NewArray("remoteWrite.proxyURL", "Optional proxy URL for writing data to -remoteWrite.url. Supported proxies: http, https, socks5. "+
|
||||||
"Example: -remoteWrite.proxyURL=socks5://proxy:1234")
|
"Example: -remoteWrite.proxyURL=socks5://proxy:1234")
|
||||||
|
|
||||||
tlsInsecureSkipVerify = flag.Bool("remoteWrite.tlsInsecureSkipVerify", false, "Whether to skip tls verification when connecting to -remoteWrite.url")
|
tlsInsecureSkipVerify = flagutil.NewArrayBool("remoteWrite.tlsInsecureSkipVerify", "Whether to skip tls verification when connecting to -remoteWrite.url")
|
||||||
tlsCertFile = flagutil.NewArray("remoteWrite.tlsCertFile", "Optional path to client-side TLS certificate file to use when connecting to -remoteWrite.url. "+
|
tlsCertFile = flagutil.NewArray("remoteWrite.tlsCertFile", "Optional path to client-side TLS certificate file to use when connecting to -remoteWrite.url. "+
|
||||||
"If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url")
|
"If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url")
|
||||||
tlsKeyFile = flagutil.NewArray("remoteWrite.tlsKeyFile", "Optional path to client-side TLS certificate key to use when connecting to -remoteWrite.url. "+
|
tlsKeyFile = flagutil.NewArray("remoteWrite.tlsKeyFile", "Optional path to client-side TLS certificate key to use when connecting to -remoteWrite.url. "+
|
||||||
|
@ -108,7 +107,7 @@ func newClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqu
|
||||||
fq: fq,
|
fq: fq,
|
||||||
hc: &http.Client{
|
hc: &http.Client{
|
||||||
Transport: tr,
|
Transport: tr,
|
||||||
Timeout: *sendTimeout,
|
Timeout: sendTimeout.GetOptionalArgOrDefault(argIdx, time.Minute),
|
||||||
},
|
},
|
||||||
stopCh: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
@ -140,7 +139,7 @@ func getTLSConfig(argIdx int) (*tls.Config, error) {
|
||||||
CertFile: tlsCertFile.GetOptionalArg(argIdx),
|
CertFile: tlsCertFile.GetOptionalArg(argIdx),
|
||||||
KeyFile: tlsKeyFile.GetOptionalArg(argIdx),
|
KeyFile: tlsKeyFile.GetOptionalArg(argIdx),
|
||||||
ServerName: tlsServerName.GetOptionalArg(argIdx),
|
ServerName: tlsServerName.GetOptionalArg(argIdx),
|
||||||
InsecureSkipVerify: *tlsInsecureSkipVerify,
|
InsecureSkipVerify: tlsInsecureSkipVerify.GetOptionalArg(argIdx),
|
||||||
}
|
}
|
||||||
if c.CAFile == "" && c.CertFile == "" && c.KeyFile == "" && c.ServerName == "" && !c.InsecureSkipVerify {
|
if c.CAFile == "" && c.CertFile == "" && c.KeyFile == "" && c.ServerName == "" && !c.InsecureSkipVerify {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
|
|
@ -1308,8 +1308,11 @@ func rollupDelta(rfa *rollupFuncArg) float64 {
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/894
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/894
|
||||||
return values[len(values)-1] - rfa.realPrevValue
|
return values[len(values)-1] - rfa.realPrevValue
|
||||||
}
|
}
|
||||||
// Assume that the previous non-existing value was 0
|
// Assume that the previous non-existing value was 0 only in the following cases:
|
||||||
// only if the first value doesn't exceed too much the delta with the next value.
|
//
|
||||||
|
// - If the delta with the next value equals to 0.
|
||||||
|
// This is the case for slow-changing counter - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/962
|
||||||
|
// - If the first value doesn't exceed too much the delta with the next value.
|
||||||
//
|
//
|
||||||
// This should prevent from improper increase() results for os-level counters
|
// This should prevent from improper increase() results for os-level counters
|
||||||
// such as cpu time or bytes sent over the network interface.
|
// such as cpu time or bytes sent over the network interface.
|
||||||
|
@ -1317,12 +1320,15 @@ func rollupDelta(rfa *rollupFuncArg) float64 {
|
||||||
//
|
//
|
||||||
// This also should prevent from improper increase() results when a part of label values are changed
|
// This also should prevent from improper increase() results when a part of label values are changed
|
||||||
// without counter reset.
|
// without counter reset.
|
||||||
d := float64(10)
|
var d float64
|
||||||
if len(values) > 1 {
|
if len(values) > 1 {
|
||||||
d = values[1] - values[0]
|
d = values[1] - values[0]
|
||||||
} else if !math.IsNaN(rfa.realNextValue) {
|
} else if !math.IsNaN(rfa.realNextValue) {
|
||||||
d = rfa.realNextValue - values[0]
|
d = rfa.realNextValue - values[0]
|
||||||
}
|
}
|
||||||
|
if d == 0 {
|
||||||
|
d = 10
|
||||||
|
}
|
||||||
if math.Abs(values[0]) < 10*(math.Abs(d)+1) {
|
if math.Abs(values[0]) < 10*(math.Abs(d)+1) {
|
||||||
prevValue = 0
|
prevValue = 0
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -1171,8 +1171,16 @@ func TestRollupDelta(t *testing.T) {
|
||||||
f(nan, nan, nan, []float64{5, 6, 8}, 8)
|
f(nan, nan, nan, []float64{5, 6, 8}, 8)
|
||||||
f(2, nan, nan, []float64{5, 6, 8}, 6)
|
f(2, nan, nan, []float64{5, 6, 8}, 6)
|
||||||
|
|
||||||
// Too big initial value must be skipped.
|
// Moderate initial value with zero delta after that.
|
||||||
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/962
|
||||||
|
f(nan, nan, nan, []float64{100}, 100)
|
||||||
|
f(nan, nan, nan, []float64{100, 100}, 100)
|
||||||
|
|
||||||
|
// Big initial value with with zero delta after that.
|
||||||
f(nan, nan, nan, []float64{1000}, 0)
|
f(nan, nan, nan, []float64{1000}, 0)
|
||||||
|
f(nan, nan, nan, []float64{1000, 1000}, 0)
|
||||||
|
|
||||||
|
// Big initial value with small delta after that.
|
||||||
f(nan, nan, nan, []float64{1000, 1001, 1002}, 2)
|
f(nan, nan, nan, []float64{1000, 1001, 1002}, 2)
|
||||||
|
|
||||||
// Non-nan realPrevValue
|
// Non-nan realPrevValue
|
||||||
|
|
|
@ -11,6 +11,7 @@
|
||||||
* FEATURE: export `vm_promscrape_active_scrapers{type="<sd_type>"}` metric for tracking the number of active scrapers per each service discovery type.
|
* FEATURE: export `vm_promscrape_active_scrapers{type="<sd_type>"}` metric for tracking the number of active scrapers per each service discovery type.
|
||||||
* FEATURE: export `vm_promscrape_scrapers_started_total{type="<sd_type>"}` and `vm_promscrape_scrapers_stopped_total{type="<sd_type>"}` metrics for tracking churn rate for scrapers
|
* FEATURE: export `vm_promscrape_scrapers_started_total{type="<sd_type>"}` and `vm_promscrape_scrapers_stopped_total{type="<sd_type>"}` metrics for tracking churn rate for scrapers
|
||||||
per each service discovery type.
|
per each service discovery type.
|
||||||
|
* FEATURE: vmagent: allow setting per-`-remoteWrite.url` command-line flags for `-remoteWrite.sendTimeout` and `-remoteWrite.tlsInsecureSkipVerify`.
|
||||||
|
|
||||||
* BUGFIX: properly handle `*` and `[...]` inside curly braces in query passed to Graphite Metrics API. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/952
|
* BUGFIX: properly handle `*` and `[...]` inside curly braces in query passed to Graphite Metrics API. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/952
|
||||||
* BUGFIX: vmagent: fix memory leak when big number of targets is discovered via service discovery.
|
* BUGFIX: vmagent: fix memory leak when big number of targets is discovered via service discovery.
|
||||||
|
@ -20,6 +21,8 @@
|
||||||
* BUGFIX: do not enable strict parsing for `-promscrape.config` if `-promscrape.config.dryRun` comand-line flag is set. Strict parsing can be enabled with `-promscrape.config.strictParse` command-line flag. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/944
|
* BUGFIX: do not enable strict parsing for `-promscrape.config` if `-promscrape.config.dryRun` comand-line flag is set. Strict parsing can be enabled with `-promscrape.config.strictParse` command-line flag. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/944
|
||||||
* BUGFIX: vminsert: properly update `vm_rpc_rerouted_rows_processed_total` metric. Previously it wasn't updated. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/955
|
* BUGFIX: vminsert: properly update `vm_rpc_rerouted_rows_processed_total` metric. Previously it wasn't updated. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/955
|
||||||
* BUGFIX: vmagent: properly recover when opening incorrectly stored persistent queue. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/964
|
* BUGFIX: vmagent: properly recover when opening incorrectly stored persistent queue. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/964
|
||||||
|
* BUGFIX: vmagent: properly handle scrape errors when stream parsing is enabled with `-promscrape.streamParse` command-line flag or with `stream_parse: true` per-target config option. Previously such errors weren't reported at `/targets` page. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/967
|
||||||
|
* BUGFIX: assume the previous value is 0 when calculating `increase()` for the first point on the graph if its value doesn't exceed 100 and the delta between two first points equals to 0. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/962
|
||||||
|
|
||||||
|
|
||||||
# [v1.49.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.49.0)
|
# [v1.49.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.49.0)
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewArray returns new Array with the given name and description.
|
// NewArray returns new Array with the given name and description.
|
||||||
|
@ -16,6 +17,24 @@ func NewArray(name, description string) *Array {
|
||||||
return &a
|
return &a
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewArrayDuration returns new ArrayDuration with the given name and description.
|
||||||
|
func NewArrayDuration(name, description string) *ArrayDuration {
|
||||||
|
description += "\nSupports `array` of values separated by comma" +
|
||||||
|
" or specified via multiple flags."
|
||||||
|
var a ArrayDuration
|
||||||
|
flag.Var(&a, name, description)
|
||||||
|
return &a
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewArrayBool returns new ArrayBool with the given name and description.
|
||||||
|
func NewArrayBool(name, description string) *ArrayBool {
|
||||||
|
description += "\nSupports `array` of values separated by comma" +
|
||||||
|
" or specified via multiple flags."
|
||||||
|
var a ArrayBool
|
||||||
|
flag.Var(&a, name, description)
|
||||||
|
return &a
|
||||||
|
}
|
||||||
|
|
||||||
// Array is a flag that holds an array of values.
|
// Array is a flag that holds an array of values.
|
||||||
//
|
//
|
||||||
// It may be set either by specifying multiple flags with the given name
|
// It may be set either by specifying multiple flags with the given name
|
||||||
|
@ -124,3 +143,83 @@ func (a *Array) GetOptionalArg(argIdx int) string {
|
||||||
}
|
}
|
||||||
return x[argIdx]
|
return x[argIdx]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ArrayBool is a flag that holds an array of booleans values.
|
||||||
|
// have the same api as Array.
|
||||||
|
type ArrayBool []bool
|
||||||
|
|
||||||
|
// IsBoolFlag implements flag.IsBoolFlag interface
|
||||||
|
func (a *ArrayBool) IsBoolFlag() bool { return true }
|
||||||
|
|
||||||
|
// String implements flag.Value interface
|
||||||
|
func (a *ArrayBool) String() string {
|
||||||
|
formattedBools := make([]string, len(*a))
|
||||||
|
for i, v := range *a {
|
||||||
|
formattedBools[i] = strconv.FormatBool(v)
|
||||||
|
}
|
||||||
|
return strings.Join(formattedBools, ",")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set implements flag.Value interface
|
||||||
|
func (a *ArrayBool) Set(value string) error {
|
||||||
|
values := parseArrayValues(value)
|
||||||
|
for _, v := range values {
|
||||||
|
b, err := strconv.ParseBool(v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
*a = append(*a, b)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetOptionalArg returns optional arg under the given argIdx.
|
||||||
|
func (a *ArrayBool) GetOptionalArg(argIdx int) bool {
|
||||||
|
x := *a
|
||||||
|
if argIdx >= len(x) {
|
||||||
|
if len(x) == 1 {
|
||||||
|
return x[0]
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return x[argIdx]
|
||||||
|
}
|
||||||
|
|
||||||
|
// ArrayDuration is a flag that holds an array of time.Duration values.
|
||||||
|
// have the same api as Array.
|
||||||
|
type ArrayDuration []time.Duration
|
||||||
|
|
||||||
|
// String implements flag.Value interface
|
||||||
|
func (a *ArrayDuration) String() string {
|
||||||
|
formattedBools := make([]string, len(*a))
|
||||||
|
for i, v := range *a {
|
||||||
|
formattedBools[i] = v.String()
|
||||||
|
}
|
||||||
|
return strings.Join(formattedBools, ",")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set implements flag.Value interface
|
||||||
|
func (a *ArrayDuration) Set(value string) error {
|
||||||
|
values := parseArrayValues(value)
|
||||||
|
for _, v := range values {
|
||||||
|
b, err := time.ParseDuration(v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
*a = append(*a, b)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetOptionalArgOrDefault returns optional arg under the given argIdx,
|
||||||
|
// or default value, if argIdx not found.
|
||||||
|
func (a *ArrayDuration) GetOptionalArgOrDefault(argIdx int, defaultValue time.Duration) time.Duration {
|
||||||
|
x := *a
|
||||||
|
if argIdx >= len(x) {
|
||||||
|
if len(x) == 1 {
|
||||||
|
return x[0]
|
||||||
|
}
|
||||||
|
return defaultValue
|
||||||
|
}
|
||||||
|
return x[argIdx]
|
||||||
|
}
|
||||||
|
|
|
@ -5,13 +5,21 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
var fooFlag Array
|
var (
|
||||||
|
fooFlag Array
|
||||||
|
fooFlagDuration ArrayDuration
|
||||||
|
fooFlagBool ArrayBool
|
||||||
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
os.Args = append(os.Args, "--fooFlag=foo", "--fooFlag=bar")
|
os.Args = append(os.Args, "--fooFlag=foo", "--fooFlag=bar", "--fooFlagDuration=10s", "--fooFlagDuration=5m")
|
||||||
|
os.Args = append(os.Args, "--fooFlagBool=true", "--fooFlagBool=false,true", "--fooFlagBool")
|
||||||
flag.Var(&fooFlag, "fooFlag", "test")
|
flag.Var(&fooFlag, "fooFlag", "test")
|
||||||
|
flag.Var(&fooFlagDuration, "fooFlagDuration", "test")
|
||||||
|
flag.Var(&fooFlagBool, "fooFlagBool", "test")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMain(m *testing.M) {
|
func TestMain(m *testing.M) {
|
||||||
|
@ -91,3 +99,123 @@ func TestArrayString(t *testing.T) {
|
||||||
f(`", foo","b\"ar",`)
|
f(`", foo","b\"ar",`)
|
||||||
f(`,"\nfoo\\",bar`)
|
f(`,"\nfoo\\",bar`)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestArrayDuration(t *testing.T) {
|
||||||
|
expected := map[time.Duration]struct{}{
|
||||||
|
time.Second * 10: {},
|
||||||
|
time.Minute * 5: {},
|
||||||
|
}
|
||||||
|
if len(expected) != len(fooFlagDuration) {
|
||||||
|
t.Errorf("len array flag (%d) is not equal to %d", len(fooFlag), len(expected))
|
||||||
|
}
|
||||||
|
for _, i := range fooFlagDuration {
|
||||||
|
if _, ok := expected[i]; !ok {
|
||||||
|
t.Errorf("unexpected item in array %v", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestArrayDurationSet(t *testing.T) {
|
||||||
|
f := func(s string, expectedValues []time.Duration) {
|
||||||
|
t.Helper()
|
||||||
|
var a ArrayDuration
|
||||||
|
_ = a.Set(s)
|
||||||
|
if !reflect.DeepEqual([]time.Duration(a), expectedValues) {
|
||||||
|
t.Fatalf("unexpected values parsed;\ngot\n%q\nwant\n%q", a, expectedValues)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f("", nil)
|
||||||
|
f(`1m`, []time.Duration{time.Minute})
|
||||||
|
f(`5m,1s,1h`, []time.Duration{time.Minute * 5, time.Second, time.Hour})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestArrayDurationGetOptionalArg(t *testing.T) {
|
||||||
|
f := func(s string, argIdx int, expectedValue time.Duration, defaultValue time.Duration) {
|
||||||
|
t.Helper()
|
||||||
|
var a ArrayDuration
|
||||||
|
_ = a.Set(s)
|
||||||
|
v := a.GetOptionalArgOrDefault(argIdx, defaultValue)
|
||||||
|
if v != expectedValue {
|
||||||
|
t.Fatalf("unexpected value; got %q; want %q", v, expectedValue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f("", 0, time.Second, time.Second)
|
||||||
|
f("", 1, time.Minute, time.Minute)
|
||||||
|
f("10s,1m", 1, time.Minute, time.Minute)
|
||||||
|
f("10s", 3, time.Second*10, time.Minute)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestArrayDurationString(t *testing.T) {
|
||||||
|
f := func(s string) {
|
||||||
|
t.Helper()
|
||||||
|
var a ArrayDuration
|
||||||
|
_ = a.Set(s)
|
||||||
|
result := a.String()
|
||||||
|
if result != s {
|
||||||
|
t.Fatalf("unexpected string;\ngot\n%s\nwant\n%s", result, s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f("")
|
||||||
|
f("10s,1m0s")
|
||||||
|
f("5m0s,1s")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestArrayBool(t *testing.T) {
|
||||||
|
expected := []bool{
|
||||||
|
true, false, true, true,
|
||||||
|
}
|
||||||
|
if len(expected) != len(fooFlagBool) {
|
||||||
|
t.Errorf("len array flag (%d) is not equal to %d", len(fooFlag), len(expected))
|
||||||
|
}
|
||||||
|
for i, v := range fooFlagBool {
|
||||||
|
if v != expected[i] {
|
||||||
|
t.Errorf("unexpected item in array index=%v,value=%v,want=%v", i, v, expected[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestArrayBoolSet(t *testing.T) {
|
||||||
|
f := func(s string, expectedValues []bool) {
|
||||||
|
t.Helper()
|
||||||
|
var a ArrayBool
|
||||||
|
_ = a.Set(s)
|
||||||
|
if !reflect.DeepEqual([]bool(a), expectedValues) {
|
||||||
|
t.Fatalf("unexpected values parsed;\ngot\n%v\nwant\n%v", a, expectedValues)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f("", nil)
|
||||||
|
f(`true`, []bool{true})
|
||||||
|
f(`false,True,False`, []bool{false, true, false})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestArrayBoolGetOptionalArg(t *testing.T) {
|
||||||
|
f := func(s string, argIdx int, expectedValue bool) {
|
||||||
|
t.Helper()
|
||||||
|
var a ArrayBool
|
||||||
|
_ = a.Set(s)
|
||||||
|
v := a.GetOptionalArg(argIdx)
|
||||||
|
if v != expectedValue {
|
||||||
|
t.Fatalf("unexpected value; got %v; want %v", v, expectedValue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f("", 0, false)
|
||||||
|
f("", 1, false)
|
||||||
|
f("true,true,false", 1, true)
|
||||||
|
f("true", 2, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestArrayBoolString(t *testing.T) {
|
||||||
|
f := func(s string) {
|
||||||
|
t.Helper()
|
||||||
|
var a ArrayBool
|
||||||
|
_ = a.Set(s)
|
||||||
|
result := a.String()
|
||||||
|
if result != s {
|
||||||
|
t.Fatalf("unexpected string;\ngot\n%s\nwant\n%s", result, s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f("")
|
||||||
|
f("true")
|
||||||
|
f("true,false")
|
||||||
|
f("false,true")
|
||||||
|
}
|
||||||
|
|
|
@ -313,38 +313,43 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
|
func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
|
||||||
sr, err := sw.GetStreamReader()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("cannot read data: %s", err)
|
|
||||||
}
|
|
||||||
samplesScraped := 0
|
samplesScraped := 0
|
||||||
samplesPostRelabeling := 0
|
samplesPostRelabeling := 0
|
||||||
|
responseSize := int64(0)
|
||||||
wc := writeRequestCtxPool.Get(sw.prevRowsLen)
|
wc := writeRequestCtxPool.Get(sw.prevRowsLen)
|
||||||
var mu sync.Mutex
|
|
||||||
err = parser.ParseStream(sr, scrapeTimestamp, false, func(rows []parser.Row) error {
|
sr, err := sw.GetStreamReader()
|
||||||
mu.Lock()
|
if err != nil {
|
||||||
defer mu.Unlock()
|
err = fmt.Errorf("cannot read data: %s", err)
|
||||||
samplesScraped += len(rows)
|
} else {
|
||||||
for i := range rows {
|
var mu sync.Mutex
|
||||||
sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true)
|
err = parser.ParseStream(sr, scrapeTimestamp, false, func(rows []parser.Row) error {
|
||||||
}
|
mu.Lock()
|
||||||
// Push the collected rows to sw before returning from the callback, since they cannot be held
|
defer mu.Unlock()
|
||||||
// after returning from the callback - this will result in data race.
|
samplesScraped += len(rows)
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247
|
for i := range rows {
|
||||||
samplesPostRelabeling += len(wc.writeRequest.Timeseries)
|
sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true)
|
||||||
sw.updateSeriesAdded(wc)
|
}
|
||||||
startTime := time.Now()
|
// Push the collected rows to sw before returning from the callback, since they cannot be held
|
||||||
sw.PushData(&wc.writeRequest)
|
// after returning from the callback - this will result in data race.
|
||||||
pushDataDuration.UpdateDuration(startTime)
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247
|
||||||
wc.resetNoRows()
|
samplesPostRelabeling += len(wc.writeRequest.Timeseries)
|
||||||
return nil
|
sw.updateSeriesAdded(wc)
|
||||||
})
|
startTime := time.Now()
|
||||||
|
sw.PushData(&wc.writeRequest)
|
||||||
|
pushDataDuration.UpdateDuration(startTime)
|
||||||
|
wc.resetNoRows()
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
responseSize = sr.bytesRead
|
||||||
|
sr.MustClose()
|
||||||
|
}
|
||||||
|
|
||||||
scrapedSamples.Update(float64(samplesScraped))
|
scrapedSamples.Update(float64(samplesScraped))
|
||||||
endTimestamp := time.Now().UnixNano() / 1e6
|
endTimestamp := time.Now().UnixNano() / 1e6
|
||||||
duration := float64(endTimestamp-realTimestamp) / 1e3
|
duration := float64(endTimestamp-realTimestamp) / 1e3
|
||||||
scrapeDuration.Update(duration)
|
scrapeDuration.Update(duration)
|
||||||
scrapeResponseSize.Update(float64(sr.bytesRead))
|
scrapeResponseSize.Update(float64(responseSize))
|
||||||
sr.MustClose()
|
|
||||||
up := 1
|
up := 1
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if samplesScraped == 0 {
|
if samplesScraped == 0 {
|
||||||
|
|
Loading…
Reference in a new issue