mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
Makefile: update golangci-lint from v1.51.2 to v1.54.2
See https://github.com/golangci/golangci-lint/releases/tag/v1.54.2
This commit is contained in:
parent
e0e856d2e7
commit
edee262ecc
53 changed files with 129 additions and 138 deletions
2
Makefile
2
Makefile
|
@ -463,7 +463,7 @@ golangci-lint: install-golangci-lint
|
||||||
golangci-lint run
|
golangci-lint run
|
||||||
|
|
||||||
install-golangci-lint:
|
install-golangci-lint:
|
||||||
which golangci-lint || curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(shell go env GOPATH)/bin v1.51.2
|
which golangci-lint || curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(shell go env GOPATH)/bin v1.54.2
|
||||||
|
|
||||||
govulncheck: install-govulncheck
|
govulncheck: install-govulncheck
|
||||||
govulncheck ./...
|
govulncheck ./...
|
||||||
|
|
|
@ -258,7 +258,7 @@ func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx {
|
||||||
if *showRemoteWriteURL {
|
if *showRemoteWriteURL {
|
||||||
sanitizedURL = fmt.Sprintf("%d:%s", i+1, remoteWriteURL)
|
sanitizedURL = fmt.Sprintf("%d:%s", i+1, remoteWriteURL)
|
||||||
}
|
}
|
||||||
rwctxs[i] = newRemoteWriteCtx(i, at, remoteWriteURL, maxInmemoryBlocks, sanitizedURL)
|
rwctxs[i] = newRemoteWriteCtx(i, remoteWriteURL, maxInmemoryBlocks, sanitizedURL)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !*keepDanglingQueues {
|
if !*keepDanglingQueues {
|
||||||
|
@ -559,7 +559,7 @@ type remoteWriteCtx struct {
|
||||||
rowsDroppedByRelabel *metrics.Counter
|
rowsDroppedByRelabel *metrics.Counter
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxInmemoryBlocks int, sanitizedURL string) *remoteWriteCtx {
|
func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks int, sanitizedURL string) *remoteWriteCtx {
|
||||||
// strip query params, otherwise changing params resets pq
|
// strip query params, otherwise changing params resets pq
|
||||||
pqURL := *remoteWriteURL
|
pqURL := *remoteWriteURL
|
||||||
pqURL.RawQuery = ""
|
pqURL.RawQuery = ""
|
||||||
|
|
|
@ -27,7 +27,7 @@ var (
|
||||||
stdDialerOnce sync.Once
|
stdDialerOnce sync.Once
|
||||||
)
|
)
|
||||||
|
|
||||||
func statDial(ctx context.Context, networkUnused, addr string) (conn net.Conn, err error) {
|
func statDial(ctx context.Context, _, addr string) (conn net.Conn, err error) {
|
||||||
network := netutil.GetTCPNetwork()
|
network := netutil.GetTCPNetwork()
|
||||||
d := getStdDialer()
|
d := getStdDialer()
|
||||||
conn, err = d.DialContext(ctx, network, addr)
|
conn, err = d.DialContext(ctx, network, addr)
|
||||||
|
|
|
@ -872,7 +872,6 @@ func TestAlertingRule_Template(t *testing.T) {
|
||||||
gotAlert := tc.rule.alerts[hash]
|
gotAlert := tc.rule.alerts[hash]
|
||||||
if gotAlert == nil {
|
if gotAlert == nil {
|
||||||
t.Fatalf("alert %d is missing; labels: %v; annotations: %v", hash, expAlert.Labels, expAlert.Annotations)
|
t.Fatalf("alert %d is missing; labels: %v; annotations: %v", hash, expAlert.Labels, expAlert.Annotations)
|
||||||
break
|
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(expAlert.Annotations, gotAlert.Annotations) {
|
if !reflect.DeepEqual(expAlert.Annotations, gotAlert.Annotations) {
|
||||||
t.Fatalf("expected to have annotations %#v; got %#v", expAlert.Annotations, gotAlert.Annotations)
|
t.Fatalf("expected to have annotations %#v; got %#v", expAlert.Annotations, gotAlert.Annotations)
|
||||||
|
|
|
@ -79,7 +79,7 @@ func TestRule_state(t *testing.T) {
|
||||||
// TestRule_stateConcurrent supposed to test concurrent
|
// TestRule_stateConcurrent supposed to test concurrent
|
||||||
// execution of state updates.
|
// execution of state updates.
|
||||||
// Should be executed with -race flag
|
// Should be executed with -race flag
|
||||||
func TestRule_stateConcurrent(t *testing.T) {
|
func TestRule_stateConcurrent(_ *testing.T) {
|
||||||
state := newRuleState(20)
|
state := newRuleState(20)
|
||||||
|
|
||||||
const workers = 50
|
const workers = 50
|
||||||
|
|
|
@ -41,7 +41,7 @@ func TestErrGroup(t *testing.T) {
|
||||||
// TestErrGroupConcurrent supposed to test concurrent
|
// TestErrGroupConcurrent supposed to test concurrent
|
||||||
// use of error group.
|
// use of error group.
|
||||||
// Should be executed with -race flag
|
// Should be executed with -race flag
|
||||||
func TestErrGroupConcurrent(t *testing.T) {
|
func TestErrGroupConcurrent(_ *testing.T) {
|
||||||
eg := new(ErrGroup)
|
eg := new(ErrGroup)
|
||||||
|
|
||||||
const writersN = 4
|
const writersN = 4
|
||||||
|
|
|
@ -170,8 +170,5 @@ func (op *otsdbProcessor) do(s queryObj) error {
|
||||||
Timestamps: data.Timestamps,
|
Timestamps: data.Timestamps,
|
||||||
Values: data.Values,
|
Values: data.Values,
|
||||||
}
|
}
|
||||||
if err := op.im.Input(&ts); err != nil {
|
return op.im.Input(&ts)
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -114,7 +114,7 @@ func (as *aggrStateAvgZero) Update(values []float64) {
|
||||||
as.seriesTotal++
|
as.seriesTotal++
|
||||||
}
|
}
|
||||||
|
|
||||||
func (as *aggrStateAvgZero) Finalize(xFilesFactor float64) []float64 {
|
func (as *aggrStateAvgZero) Finalize(_ float64) []float64 {
|
||||||
sums := as.sums
|
sums := as.sums
|
||||||
values := make([]float64, as.pointsLen)
|
values := make([]float64, as.pointsLen)
|
||||||
count := float64(as.seriesTotal)
|
count := float64(as.seriesTotal)
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
|
||||||
)
|
)
|
||||||
|
@ -14,7 +13,7 @@ import (
|
||||||
// FunctionsHandler implements /functions handler.
|
// FunctionsHandler implements /functions handler.
|
||||||
//
|
//
|
||||||
// See https://graphite.readthedocs.io/en/latest/functions.html#function-api
|
// See https://graphite.readthedocs.io/en/latest/functions.html#function-api
|
||||||
func FunctionsHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error {
|
func FunctionsHandler(w http.ResponseWriter, r *http.Request) error {
|
||||||
grouped := httputils.GetBool(r, "grouped")
|
grouped := httputils.GetBool(r, "grouped")
|
||||||
group := r.FormValue("group")
|
group := r.FormValue("group")
|
||||||
result := make(map[string]interface{})
|
result := make(map[string]interface{})
|
||||||
|
@ -40,7 +39,7 @@ func FunctionsHandler(startTime time.Time, w http.ResponseWriter, r *http.Reques
|
||||||
// FunctionDetailsHandler implements /functions/<func_name> handler.
|
// FunctionDetailsHandler implements /functions/<func_name> handler.
|
||||||
//
|
//
|
||||||
// See https://graphite.readthedocs.io/en/latest/functions.html#function-api
|
// See https://graphite.readthedocs.io/en/latest/functions.html#function-api
|
||||||
func FunctionDetailsHandler(startTime time.Time, funcName string, w http.ResponseWriter, r *http.Request) error {
|
func FunctionDetailsHandler(funcName string, w http.ResponseWriter, r *http.Request) error {
|
||||||
result := funcs[funcName]
|
result := funcs[funcName]
|
||||||
if result == nil {
|
if result == nil {
|
||||||
return fmt.Errorf("cannot find function %q", funcName)
|
return fmt.Errorf("cannot find function %q", funcName)
|
||||||
|
|
|
@ -85,7 +85,7 @@ func MetricsFindHandler(startTime time.Time, w http.ResponseWriter, r *http.Requ
|
||||||
if leavesOnly {
|
if leavesOnly {
|
||||||
paths = filterLeaves(paths, delimiter)
|
paths = filterLeaves(paths, delimiter)
|
||||||
}
|
}
|
||||||
paths = deduplicatePaths(paths, delimiter)
|
paths = deduplicatePaths(paths)
|
||||||
sortPaths(paths, delimiter)
|
sortPaths(paths, delimiter)
|
||||||
contentType := getContentType(jsonp)
|
contentType := getContentType(jsonp)
|
||||||
w.Header().Set("Content-Type", contentType)
|
w.Header().Set("Content-Type", contentType)
|
||||||
|
@ -99,7 +99,7 @@ func MetricsFindHandler(startTime time.Time, w http.ResponseWriter, r *http.Requ
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func deduplicatePaths(paths []string, delimiter string) []string {
|
func deduplicatePaths(paths []string) []string {
|
||||||
if len(paths) == 0 {
|
if len(paths) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -189,7 +189,7 @@ func init() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func transformTODO(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesFunc, error) {
|
func transformTODO(_ *evalConfig, _ *graphiteql.FuncExpr) (nextSeriesFunc, error) {
|
||||||
return nil, fmt.Errorf("TODO: implement this function")
|
return nil, fmt.Errorf("TODO: implement this function")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1062,7 +1062,7 @@ func transformCumulative(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesFun
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return consolidateBy(ec, fe, nextSeries, "sum")
|
return consolidateBy(fe, nextSeries, "sum")
|
||||||
}
|
}
|
||||||
|
|
||||||
// See https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.consolidateBy
|
// See https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.consolidateBy
|
||||||
|
@ -1079,10 +1079,10 @@ func transformConsolidateBy(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeries
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return consolidateBy(ec, fe, nextSeries, funcName)
|
return consolidateBy(fe, nextSeries, funcName)
|
||||||
}
|
}
|
||||||
|
|
||||||
func consolidateBy(ec *evalConfig, expr graphiteql.Expr, nextSeries nextSeriesFunc, funcName string) (nextSeriesFunc, error) {
|
func consolidateBy(expr graphiteql.Expr, nextSeries nextSeriesFunc, funcName string) (nextSeriesFunc, error) {
|
||||||
consolidateFunc, err := getAggrFunc(funcName)
|
consolidateFunc, err := getAggrFunc(funcName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -1843,10 +1843,10 @@ func transformHighest(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesFunc,
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return highestGeneric(ec, fe, nextSeries, n, funcName)
|
return highestGeneric(fe, nextSeries, n, funcName)
|
||||||
}
|
}
|
||||||
|
|
||||||
func highestGeneric(ec *evalConfig, expr graphiteql.Expr, nextSeries nextSeriesFunc, n float64, funcName string) (nextSeriesFunc, error) {
|
func highestGeneric(expr graphiteql.Expr, nextSeries nextSeriesFunc, n float64, funcName string) (nextSeriesFunc, error) {
|
||||||
aggrFunc, err := getAggrFunc(funcName)
|
aggrFunc, err := getAggrFunc(funcName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_, _ = drainAllSeries(nextSeries)
|
_, _ = drainAllSeries(nextSeries)
|
||||||
|
@ -1928,7 +1928,7 @@ func transformHighestAverage(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSerie
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return highestGeneric(ec, fe, nextSeries, n, "average")
|
return highestGeneric(fe, nextSeries, n, "average")
|
||||||
}
|
}
|
||||||
|
|
||||||
// See https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.highestCurrent
|
// See https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.highestCurrent
|
||||||
|
@ -1945,7 +1945,7 @@ func transformHighestCurrent(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSerie
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return highestGeneric(ec, fe, nextSeries, n, "current")
|
return highestGeneric(fe, nextSeries, n, "current")
|
||||||
}
|
}
|
||||||
|
|
||||||
// See https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.highestMax
|
// See https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.highestMax
|
||||||
|
@ -1962,7 +1962,7 @@ func transformHighestMax(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesFun
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return highestGeneric(ec, fe, nextSeries, n, "max")
|
return highestGeneric(fe, nextSeries, n, "max")
|
||||||
}
|
}
|
||||||
|
|
||||||
// See https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.hitcount
|
// See https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.hitcount
|
||||||
|
@ -2379,10 +2379,10 @@ func transformLowest(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesFunc, e
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return lowestGeneric(ec, fe, nextSeries, n, funcName)
|
return lowestGeneric(fe, nextSeries, n, funcName)
|
||||||
}
|
}
|
||||||
|
|
||||||
func lowestGeneric(ec *evalConfig, expr graphiteql.Expr, nextSeries nextSeriesFunc, n float64, funcName string) (nextSeriesFunc, error) {
|
func lowestGeneric(expr graphiteql.Expr, nextSeries nextSeriesFunc, n float64, funcName string) (nextSeriesFunc, error) {
|
||||||
aggrFunc, err := getAggrFunc(funcName)
|
aggrFunc, err := getAggrFunc(funcName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_, _ = drainAllSeries(nextSeries)
|
_, _ = drainAllSeries(nextSeries)
|
||||||
|
@ -2459,7 +2459,7 @@ func transformLowestAverage(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeries
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return lowestGeneric(ec, fe, nextSeries, n, "average")
|
return lowestGeneric(fe, nextSeries, n, "average")
|
||||||
}
|
}
|
||||||
|
|
||||||
// See https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.lowestCurrent
|
// See https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.lowestCurrent
|
||||||
|
@ -2476,7 +2476,7 @@ func transformLowestCurrent(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeries
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return lowestGeneric(ec, fe, nextSeries, n, "current")
|
return lowestGeneric(fe, nextSeries, n, "current")
|
||||||
}
|
}
|
||||||
|
|
||||||
// See https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.maxSeries
|
// See https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.maxSeries
|
||||||
|
@ -2607,7 +2607,7 @@ func transformMostDeviant(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesFu
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return highestGeneric(ec, fe, nextSeries, n, "stddev")
|
return highestGeneric(fe, nextSeries, n, "stddev")
|
||||||
}
|
}
|
||||||
|
|
||||||
// See https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.movingAverage
|
// See https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.movingAverage
|
||||||
|
@ -3862,7 +3862,11 @@ func nextSeriesConcurrentWrapper(nextSeries nextSeriesFunc, f func(s *series) (*
|
||||||
}
|
}
|
||||||
if r.err != nil {
|
if r.err != nil {
|
||||||
// Drain the rest of series before returning the error.
|
// Drain the rest of series before returning the error.
|
||||||
for range resultCh {
|
for {
|
||||||
|
_, ok := <-resultCh
|
||||||
|
if !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
<-errCh
|
<-errCh
|
||||||
return nil, r.err
|
return nil, r.err
|
||||||
|
@ -4733,7 +4737,7 @@ func transformSortByTotal(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesFu
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return sortByGeneric(ec, fe, nextSeries, "sum", true)
|
return sortByGeneric(fe, nextSeries, "sum", true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.sortBy
|
// https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.sortBy
|
||||||
|
@ -4754,10 +4758,10 @@ func transformSortBy(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesFunc, e
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return sortByGeneric(ec, fe, nextSeries, funcName, reverse)
|
return sortByGeneric(fe, nextSeries, funcName, reverse)
|
||||||
}
|
}
|
||||||
|
|
||||||
func sortByGeneric(ec *evalConfig, fe *graphiteql.FuncExpr, nextSeries nextSeriesFunc, funcName string, reverse bool) (nextSeriesFunc, error) {
|
func sortByGeneric(fe *graphiteql.FuncExpr, nextSeries nextSeriesFunc, funcName string, reverse bool) (nextSeriesFunc, error) {
|
||||||
aggrFunc, err := getAggrFunc(funcName)
|
aggrFunc, err := getAggrFunc(funcName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_, _ = drainAllSeries(nextSeries)
|
_, _ = drainAllSeries(nextSeries)
|
||||||
|
@ -4868,7 +4872,7 @@ func transformSortByMinima(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesF
|
||||||
}
|
}
|
||||||
return s, nil
|
return s, nil
|
||||||
})
|
})
|
||||||
return sortByGeneric(ec, fe, f, "min", false)
|
return sortByGeneric(fe, f, "min", false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.sortByMaxima
|
// https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.sortByMaxima
|
||||||
|
@ -4881,7 +4885,7 @@ func transformSortByMaxima(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesF
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return sortByGeneric(ec, fe, nextSeries, "max", true)
|
return sortByGeneric(fe, nextSeries, "max", true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.smartSummarize
|
// https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.smartSummarize
|
||||||
|
@ -5286,7 +5290,7 @@ func holtWinterConfidenceBands(ec *evalConfig, fe *graphiteql.FuncExpr, args []*
|
||||||
f := nextSeriesConcurrentWrapper(nextSeries, func(s *series) (*series, error) {
|
f := nextSeriesConcurrentWrapper(nextSeries, func(s *series) (*series, error) {
|
||||||
s.consolidate(&ecCopy, step)
|
s.consolidate(&ecCopy, step)
|
||||||
timeStamps := s.Timestamps[trimWindowPoints:]
|
timeStamps := s.Timestamps[trimWindowPoints:]
|
||||||
analysis := holtWintersAnalysis(&ecCopy, s, seasonalityMs)
|
analysis := holtWintersAnalysis(s, seasonalityMs)
|
||||||
forecastValues := analysis.predictions.Values[trimWindowPoints:]
|
forecastValues := analysis.predictions.Values[trimWindowPoints:]
|
||||||
deviationValues := analysis.deviations.Values[trimWindowPoints:]
|
deviationValues := analysis.deviations.Values[trimWindowPoints:]
|
||||||
valuesLen := len(forecastValues)
|
valuesLen := len(forecastValues)
|
||||||
|
@ -5450,7 +5454,7 @@ func transformHoltWintersForecast(ec *evalConfig, fe *graphiteql.FuncExpr) (next
|
||||||
trimWindowPoints := ecCopy.pointsLen(step) - ec.pointsLen(step)
|
trimWindowPoints := ecCopy.pointsLen(step) - ec.pointsLen(step)
|
||||||
f := nextSeriesConcurrentWrapper(nextSeries, func(s *series) (*series, error) {
|
f := nextSeriesConcurrentWrapper(nextSeries, func(s *series) (*series, error) {
|
||||||
s.consolidate(&ecCopy, step)
|
s.consolidate(&ecCopy, step)
|
||||||
analysis := holtWintersAnalysis(&ecCopy, s, seasonalityMs)
|
analysis := holtWintersAnalysis(s, seasonalityMs)
|
||||||
predictions := analysis.predictions
|
predictions := analysis.predictions
|
||||||
|
|
||||||
s.Tags["holtWintersForecast"] = "1"
|
s.Tags["holtWintersForecast"] = "1"
|
||||||
|
@ -5468,7 +5472,7 @@ func transformHoltWintersForecast(ec *evalConfig, fe *graphiteql.FuncExpr) (next
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func holtWintersAnalysis(ec *evalConfig, s *series, seasonality int64) holtWintersAnalysisResult {
|
func holtWintersAnalysis(s *series, seasonality int64) holtWintersAnalysisResult {
|
||||||
alpha := 0.1
|
alpha := 0.1
|
||||||
gamma := alpha
|
gamma := alpha
|
||||||
beta := 0.0035
|
beta := 0.0035
|
||||||
|
|
|
@ -236,7 +236,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||||
funcName = strings.TrimPrefix(funcName, "/")
|
funcName = strings.TrimPrefix(funcName, "/")
|
||||||
if funcName == "" {
|
if funcName == "" {
|
||||||
graphiteFunctionsRequests.Inc()
|
graphiteFunctionsRequests.Inc()
|
||||||
if err := graphite.FunctionsHandler(startTime, w, r); err != nil {
|
if err := graphite.FunctionsHandler(w, r); err != nil {
|
||||||
graphiteFunctionsErrors.Inc()
|
graphiteFunctionsErrors.Inc()
|
||||||
httpserver.Errorf(w, r, "%s", err)
|
httpserver.Errorf(w, r, "%s", err)
|
||||||
return true
|
return true
|
||||||
|
@ -244,7 +244,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
graphiteFunctionDetailsRequests.Inc()
|
graphiteFunctionDetailsRequests.Inc()
|
||||||
if err := graphite.FunctionDetailsHandler(startTime, funcName, w, r); err != nil {
|
if err := graphite.FunctionDetailsHandler(funcName, w, r); err != nil {
|
||||||
graphiteFunctionDetailsErrors.Inc()
|
graphiteFunctionDetailsErrors.Inc()
|
||||||
httpserver.Errorf(w, r, "%s", err)
|
httpserver.Errorf(w, r, "%s", err)
|
||||||
return true
|
return true
|
||||||
|
|
|
@ -667,10 +667,7 @@ func SeriesHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseW
|
||||||
qt.Donef("start=%d, end=%d", cp.start, cp.end)
|
qt.Donef("start=%d, end=%d", cp.start, cp.end)
|
||||||
}
|
}
|
||||||
WriteSeriesResponse(bw, metricNames, qt, qtDone)
|
WriteSeriesResponse(bw, metricNames, qt, qtDone)
|
||||||
if err := bw.Flush(); err != nil {
|
return bw.Flush()
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var seriesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/series"}`)
|
var seriesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/series"}`)
|
||||||
|
|
|
@ -12,7 +12,7 @@ import (
|
||||||
// ActiveQueriesHandler returns response to /api/v1/status/active_queries
|
// ActiveQueriesHandler returns response to /api/v1/status/active_queries
|
||||||
//
|
//
|
||||||
// It writes a JSON with active queries to w.
|
// It writes a JSON with active queries to w.
|
||||||
func ActiveQueriesHandler(w http.ResponseWriter, r *http.Request) {
|
func ActiveQueriesHandler(w http.ResponseWriter, _ *http.Request) {
|
||||||
aqes := activeQueriesV.GetAll()
|
aqes := activeQueriesV.GetAll()
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
|
|
@ -73,7 +73,7 @@ func Exec(qt *querytracer.Tracer, ec *EvalConfig, q string, isFirstPointOnly boo
|
||||||
}
|
}
|
||||||
qt.Printf("leave only the first point in every series")
|
qt.Printf("leave only the first point in every series")
|
||||||
}
|
}
|
||||||
maySort := maySortResults(e, rv)
|
maySort := maySortResults(e)
|
||||||
result, err := timeseriesToResult(rv, maySort)
|
result, err := timeseriesToResult(rv, maySort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -95,7 +95,7 @@ func Exec(qt *querytracer.Tracer, ec *EvalConfig, q string, isFirstPointOnly boo
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func maySortResults(e metricsql.Expr, tss []*timeseries) bool {
|
func maySortResults(e metricsql.Expr) bool {
|
||||||
switch v := e.(type) {
|
switch v := e.(type) {
|
||||||
case *metricsql.FuncExpr:
|
case *metricsql.FuncExpr:
|
||||||
switch strings.ToLower(v.Name) {
|
switch strings.ToLower(v.Name) {
|
||||||
|
|
|
@ -2222,7 +2222,7 @@ func rollupIntegrate(rfa *rollupFuncArg) float64 {
|
||||||
return sum
|
return sum
|
||||||
}
|
}
|
||||||
|
|
||||||
func rollupFake(rfa *rollupFuncArg) float64 {
|
func rollupFake(_ *rollupFuncArg) float64 {
|
||||||
logger.Panicf("BUG: rollupFake shouldn't be called")
|
logger.Panicf("BUG: rollupFake shouldn't be called")
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
|
@ -160,7 +160,7 @@ func TestDerivValues(t *testing.T) {
|
||||||
testRowsEqual(t, values, timestamps, valuesExpected, timestamps)
|
testRowsEqual(t, values, timestamps, valuesExpected, timestamps)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testRollupFunc(t *testing.T, funcName string, args []interface{}, meExpected *metricsql.MetricExpr, vExpected float64) {
|
func testRollupFunc(t *testing.T, funcName string, args []interface{}, vExpected float64) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
nrf := getRollupFunc(funcName)
|
nrf := getRollupFunc(funcName)
|
||||||
if nrf == nil {
|
if nrf == nil {
|
||||||
|
@ -203,7 +203,7 @@ func TestRollupDurationOverTime(t *testing.T) {
|
||||||
}}
|
}}
|
||||||
var me metricsql.MetricExpr
|
var me metricsql.MetricExpr
|
||||||
args := []interface{}{&metricsql.RollupExpr{Expr: &me}, maxIntervals}
|
args := []interface{}{&metricsql.RollupExpr{Expr: &me}, maxIntervals}
|
||||||
testRollupFunc(t, "duration_over_time", args, &me, dExpected)
|
testRollupFunc(t, "duration_over_time", args, dExpected)
|
||||||
}
|
}
|
||||||
f(-123, 0)
|
f(-123, 0)
|
||||||
f(0, 0)
|
f(0, 0)
|
||||||
|
@ -224,7 +224,7 @@ func TestRollupShareLEOverTime(t *testing.T) {
|
||||||
}}
|
}}
|
||||||
var me metricsql.MetricExpr
|
var me metricsql.MetricExpr
|
||||||
args := []interface{}{&metricsql.RollupExpr{Expr: &me}, les}
|
args := []interface{}{&metricsql.RollupExpr{Expr: &me}, les}
|
||||||
testRollupFunc(t, "share_le_over_time", args, &me, vExpected)
|
testRollupFunc(t, "share_le_over_time", args, vExpected)
|
||||||
}
|
}
|
||||||
|
|
||||||
f(-123, 0)
|
f(-123, 0)
|
||||||
|
@ -247,7 +247,7 @@ func TestRollupShareGTOverTime(t *testing.T) {
|
||||||
}}
|
}}
|
||||||
var me metricsql.MetricExpr
|
var me metricsql.MetricExpr
|
||||||
args := []interface{}{&metricsql.RollupExpr{Expr: &me}, gts}
|
args := []interface{}{&metricsql.RollupExpr{Expr: &me}, gts}
|
||||||
testRollupFunc(t, "share_gt_over_time", args, &me, vExpected)
|
testRollupFunc(t, "share_gt_over_time", args, vExpected)
|
||||||
}
|
}
|
||||||
|
|
||||||
f(-123, 1)
|
f(-123, 1)
|
||||||
|
@ -270,7 +270,7 @@ func TestRollupShareEQOverTime(t *testing.T) {
|
||||||
}}
|
}}
|
||||||
var me metricsql.MetricExpr
|
var me metricsql.MetricExpr
|
||||||
args := []interface{}{&metricsql.RollupExpr{Expr: &me}, eqs}
|
args := []interface{}{&metricsql.RollupExpr{Expr: &me}, eqs}
|
||||||
testRollupFunc(t, "share_eq_over_time", args, &me, vExpected)
|
testRollupFunc(t, "share_eq_over_time", args, vExpected)
|
||||||
}
|
}
|
||||||
|
|
||||||
f(-123, 0)
|
f(-123, 0)
|
||||||
|
@ -289,7 +289,7 @@ func TestRollupCountLEOverTime(t *testing.T) {
|
||||||
}}
|
}}
|
||||||
var me metricsql.MetricExpr
|
var me metricsql.MetricExpr
|
||||||
args := []interface{}{&metricsql.RollupExpr{Expr: &me}, les}
|
args := []interface{}{&metricsql.RollupExpr{Expr: &me}, les}
|
||||||
testRollupFunc(t, "count_le_over_time", args, &me, vExpected)
|
testRollupFunc(t, "count_le_over_time", args, vExpected)
|
||||||
}
|
}
|
||||||
|
|
||||||
f(-123, 0)
|
f(-123, 0)
|
||||||
|
@ -312,7 +312,7 @@ func TestRollupCountGTOverTime(t *testing.T) {
|
||||||
}}
|
}}
|
||||||
var me metricsql.MetricExpr
|
var me metricsql.MetricExpr
|
||||||
args := []interface{}{&metricsql.RollupExpr{Expr: &me}, gts}
|
args := []interface{}{&metricsql.RollupExpr{Expr: &me}, gts}
|
||||||
testRollupFunc(t, "count_gt_over_time", args, &me, vExpected)
|
testRollupFunc(t, "count_gt_over_time", args, vExpected)
|
||||||
}
|
}
|
||||||
|
|
||||||
f(-123, 12)
|
f(-123, 12)
|
||||||
|
@ -335,7 +335,7 @@ func TestRollupCountEQOverTime(t *testing.T) {
|
||||||
}}
|
}}
|
||||||
var me metricsql.MetricExpr
|
var me metricsql.MetricExpr
|
||||||
args := []interface{}{&metricsql.RollupExpr{Expr: &me}, eqs}
|
args := []interface{}{&metricsql.RollupExpr{Expr: &me}, eqs}
|
||||||
testRollupFunc(t, "count_eq_over_time", args, &me, vExpected)
|
testRollupFunc(t, "count_eq_over_time", args, vExpected)
|
||||||
}
|
}
|
||||||
|
|
||||||
f(-123, 0)
|
f(-123, 0)
|
||||||
|
@ -354,7 +354,7 @@ func TestRollupCountNEOverTime(t *testing.T) {
|
||||||
}}
|
}}
|
||||||
var me metricsql.MetricExpr
|
var me metricsql.MetricExpr
|
||||||
args := []interface{}{&metricsql.RollupExpr{Expr: &me}, nes}
|
args := []interface{}{&metricsql.RollupExpr{Expr: &me}, nes}
|
||||||
testRollupFunc(t, "count_ne_over_time", args, &me, vExpected)
|
testRollupFunc(t, "count_ne_over_time", args, vExpected)
|
||||||
}
|
}
|
||||||
|
|
||||||
f(-123, 12)
|
f(-123, 12)
|
||||||
|
@ -373,7 +373,7 @@ func TestRollupQuantileOverTime(t *testing.T) {
|
||||||
}}
|
}}
|
||||||
var me metricsql.MetricExpr
|
var me metricsql.MetricExpr
|
||||||
args := []interface{}{phis, &metricsql.RollupExpr{Expr: &me}}
|
args := []interface{}{phis, &metricsql.RollupExpr{Expr: &me}}
|
||||||
testRollupFunc(t, "quantile_over_time", args, &me, vExpected)
|
testRollupFunc(t, "quantile_over_time", args, vExpected)
|
||||||
}
|
}
|
||||||
|
|
||||||
f(-123, math.Inf(-1))
|
f(-123, math.Inf(-1))
|
||||||
|
@ -395,7 +395,7 @@ func TestRollupPredictLinear(t *testing.T) {
|
||||||
}}
|
}}
|
||||||
var me metricsql.MetricExpr
|
var me metricsql.MetricExpr
|
||||||
args := []interface{}{&metricsql.RollupExpr{Expr: &me}, secs}
|
args := []interface{}{&metricsql.RollupExpr{Expr: &me}, secs}
|
||||||
testRollupFunc(t, "predict_linear", args, &me, vExpected)
|
testRollupFunc(t, "predict_linear", args, vExpected)
|
||||||
}
|
}
|
||||||
|
|
||||||
f(0e-3, 65.07405077267295)
|
f(0e-3, 65.07405077267295)
|
||||||
|
@ -434,7 +434,7 @@ func TestRollupHoltWinters(t *testing.T) {
|
||||||
}}
|
}}
|
||||||
var me metricsql.MetricExpr
|
var me metricsql.MetricExpr
|
||||||
args := []interface{}{&metricsql.RollupExpr{Expr: &me}, sfs, tfs}
|
args := []interface{}{&metricsql.RollupExpr{Expr: &me}, sfs, tfs}
|
||||||
testRollupFunc(t, "holt_winters", args, &me, vExpected)
|
testRollupFunc(t, "holt_winters", args, vExpected)
|
||||||
}
|
}
|
||||||
|
|
||||||
f(-1, 0.5, nan)
|
f(-1, 0.5, nan)
|
||||||
|
@ -462,7 +462,7 @@ func TestRollupHoeffdingBoundLower(t *testing.T) {
|
||||||
}}
|
}}
|
||||||
var me metricsql.MetricExpr
|
var me metricsql.MetricExpr
|
||||||
args := []interface{}{phis, &metricsql.RollupExpr{Expr: &me}}
|
args := []interface{}{phis, &metricsql.RollupExpr{Expr: &me}}
|
||||||
testRollupFunc(t, "hoeffding_bound_lower", args, &me, vExpected)
|
testRollupFunc(t, "hoeffding_bound_lower", args, vExpected)
|
||||||
}
|
}
|
||||||
|
|
||||||
f(0.5, 28.21949401521037)
|
f(0.5, 28.21949401521037)
|
||||||
|
@ -483,7 +483,7 @@ func TestRollupHoeffdingBoundUpper(t *testing.T) {
|
||||||
}}
|
}}
|
||||||
var me metricsql.MetricExpr
|
var me metricsql.MetricExpr
|
||||||
args := []interface{}{phis, &metricsql.RollupExpr{Expr: &me}}
|
args := []interface{}{phis, &metricsql.RollupExpr{Expr: &me}}
|
||||||
testRollupFunc(t, "hoeffding_bound_upper", args, &me, vExpected)
|
testRollupFunc(t, "hoeffding_bound_upper", args, vExpected)
|
||||||
}
|
}
|
||||||
|
|
||||||
f(0.5, 65.9471726514563)
|
f(0.5, 65.9471726514563)
|
||||||
|
@ -500,7 +500,7 @@ func TestRollupNewRollupFuncSuccess(t *testing.T) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
var me metricsql.MetricExpr
|
var me metricsql.MetricExpr
|
||||||
args := []interface{}{&metricsql.RollupExpr{Expr: &me}}
|
args := []interface{}{&metricsql.RollupExpr{Expr: &me}}
|
||||||
testRollupFunc(t, funcName, args, &me, vExpected)
|
testRollupFunc(t, funcName, args, vExpected)
|
||||||
}
|
}
|
||||||
|
|
||||||
f("default_rollup", 34)
|
f("default_rollup", 34)
|
||||||
|
|
|
@ -1090,18 +1090,18 @@ func transformHour(t time.Time) int {
|
||||||
return t.Hour()
|
return t.Hour()
|
||||||
}
|
}
|
||||||
|
|
||||||
func runningSum(a, b float64, idx int) float64 {
|
func runningSum(a, b float64, _ int) float64 {
|
||||||
return a + b
|
return a + b
|
||||||
}
|
}
|
||||||
|
|
||||||
func runningMax(a, b float64, idx int) float64 {
|
func runningMax(a, b float64, _ int) float64 {
|
||||||
if a > b {
|
if a > b {
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
return b
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
func runningMin(a, b float64, idx int) float64 {
|
func runningMin(a, b float64, _ int) float64 {
|
||||||
if a < b {
|
if a < b {
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
|
@ -121,7 +121,7 @@ func TestCache(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCacheConcurrentAccess(t *testing.T) {
|
func TestCacheConcurrentAccess(_ *testing.T) {
|
||||||
const sizeMaxBytes = 16 * 1024 * 1024
|
const sizeMaxBytes = 16 * 1024 * 1024
|
||||||
getMaxSize := func() int {
|
getMaxSize := func() int {
|
||||||
return sizeMaxBytes
|
return sizeMaxBytes
|
||||||
|
|
|
@ -7,7 +7,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestInternStringSerial(t *testing.T) {
|
func TestInternStringSerial(t *testing.T) {
|
||||||
if err := testInternString(t); err != nil {
|
if err := testInternString(); err != nil {
|
||||||
t.Fatalf("unexpected error: %s", err)
|
t.Fatalf("unexpected error: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,7 +17,7 @@ func TestInternStringConcurrent(t *testing.T) {
|
||||||
resultCh := make(chan error, concurrency)
|
resultCh := make(chan error, concurrency)
|
||||||
for i := 0; i < concurrency; i++ {
|
for i := 0; i < concurrency; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
resultCh <- testInternString(t)
|
resultCh <- testInternString()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
timer := time.NewTimer(5 * time.Second)
|
timer := time.NewTimer(5 * time.Second)
|
||||||
|
@ -33,7 +33,7 @@ func TestInternStringConcurrent(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testInternString(t *testing.T) error {
|
func testInternString() error {
|
||||||
for i := 0; i < 1000; i++ {
|
for i := 0; i < 1000; i++ {
|
||||||
s := fmt.Sprintf("foo_%d", i)
|
s := fmt.Sprintf("foo_%d", i)
|
||||||
s1 := InternString(s)
|
s1 := InternString(s)
|
||||||
|
|
|
@ -120,7 +120,7 @@ func (bs *blockSearch) search(bsw *blockSearchWork) {
|
||||||
case "_stream":
|
case "_stream":
|
||||||
bs.br.addStreamColumn(bs)
|
bs.br.addStreamColumn(bs)
|
||||||
case "_time":
|
case "_time":
|
||||||
bs.br.addTimeColumn(bs)
|
bs.br.addTimeColumn()
|
||||||
default:
|
default:
|
||||||
v := bs.csh.getConstColumnValue(columnName)
|
v := bs.csh.getConstColumnValue(columnName)
|
||||||
if v != "" {
|
if v != "" {
|
||||||
|
@ -452,7 +452,7 @@ func (br *blockResult) addColumn(bs *blockSearch, ch *columnHeader, bm *filterBi
|
||||||
br.valuesBuf = valuesBuf
|
br.valuesBuf = valuesBuf
|
||||||
}
|
}
|
||||||
|
|
||||||
func (br *blockResult) addTimeColumn(bs *blockSearch) {
|
func (br *blockResult) addTimeColumn() {
|
||||||
br.cs = append(br.cs, blockResultColumn{
|
br.cs = append(br.cs, blockResultColumn{
|
||||||
isTime: true,
|
isTime: true,
|
||||||
})
|
})
|
||||||
|
|
|
@ -155,11 +155,11 @@ func (nf *noopFilter) String() string {
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nf *noopFilter) updateReferencedColumnNames(m map[string]struct{}) {
|
func (nf *noopFilter) updateReferencedColumnNames(_ map[string]struct{}) {
|
||||||
// nothing to do
|
// nothing to do
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nf *noopFilter) apply(bs *blockSearch, bm *filterBitmap) {
|
func (nf *noopFilter) apply(_ *blockSearch, _ *filterBitmap) {
|
||||||
// nothing to do
|
// nothing to do
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,7 +7,7 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestStorageLifecycle(t *testing.T) {
|
func TestStorageLifecycle(_ *testing.T) {
|
||||||
const path = "TestStorageLifecycle"
|
const path = "TestStorageLifecycle"
|
||||||
|
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < 3; i++ {
|
||||||
|
|
|
@ -85,7 +85,7 @@ func TestCache(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCacheConcurrentAccess(t *testing.T) {
|
func TestCacheConcurrentAccess(_ *testing.T) {
|
||||||
const sizeMaxBytes = 16 * 1024 * 1024
|
const sizeMaxBytes = 16 * 1024 * 1024
|
||||||
getMaxSize := func() int {
|
getMaxSize := func() int {
|
||||||
return sizeMaxBytes
|
return sizeMaxBytes
|
||||||
|
|
|
@ -662,14 +662,14 @@ func (tb *Table) flushInmemoryParts(isFinal bool) {
|
||||||
func (riss *rawItemsShards) flush(tb *Table, dst []*inmemoryBlock, isFinal bool) []*inmemoryBlock {
|
func (riss *rawItemsShards) flush(tb *Table, dst []*inmemoryBlock, isFinal bool) []*inmemoryBlock {
|
||||||
tb.rawItemsPendingFlushesWG.Add(1)
|
tb.rawItemsPendingFlushesWG.Add(1)
|
||||||
for i := range riss.shards {
|
for i := range riss.shards {
|
||||||
dst = riss.shards[i].appendBlocksToFlush(dst, tb, isFinal)
|
dst = riss.shards[i].appendBlocksToFlush(dst, isFinal)
|
||||||
}
|
}
|
||||||
tb.flushBlocksToParts(dst, isFinal)
|
tb.flushBlocksToParts(dst, isFinal)
|
||||||
tb.rawItemsPendingFlushesWG.Done()
|
tb.rawItemsPendingFlushesWG.Done()
|
||||||
return dst
|
return dst
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, tb *Table, isFinal bool) []*inmemoryBlock {
|
func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, isFinal bool) []*inmemoryBlock {
|
||||||
currentTime := fasttime.UnixTimestamp()
|
currentTime := fasttime.UnixTimestamp()
|
||||||
flushSeconds := int64(pendingItemsFlushInterval.Seconds())
|
flushSeconds := int64(pendingItemsFlushInterval.Seconds())
|
||||||
if flushSeconds <= 0 {
|
if flushSeconds <= 0 {
|
||||||
|
|
|
@ -62,10 +62,9 @@ func (sc *statConn) Read(p []byte) (int, error) {
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
var ne net.Error
|
var ne net.Error
|
||||||
if errors.As(err, &ne) && ne.Timeout() {
|
if errors.As(err, &ne) && ne.Timeout() {
|
||||||
if fasttime.UnixTimestamp()-startTime <= 1 {
|
|
||||||
// Ignore artificial timeout generated by net/http.Server
|
// Ignore artificial timeout generated by net/http.Server
|
||||||
// See https://cs.opensource.google/go/go/+/refs/tags/go1.20.5:src/net/http/server.go;l=701
|
// See https://cs.opensource.google/go/go/+/refs/tags/go1.20.5:src/net/http/server.go;l=701
|
||||||
} else {
|
if fasttime.UnixTimestamp()-startTime > 1 {
|
||||||
sc.cm.readTimeouts.Inc()
|
sc.cm.readTimeouts.Inc()
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -7,7 +7,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestFastQueueOpenClose(t *testing.T) {
|
func TestFastQueueOpenClose(_ *testing.T) {
|
||||||
path := "fast-queue-open-close"
|
path := "fast-queue-open-close"
|
||||||
mustDeleteDir(path)
|
mustDeleteDir(path)
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
|
|
|
@ -31,7 +31,7 @@ type SDConfig struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetLabels returns DNS labels according to sdc.
|
// GetLabels returns DNS labels according to sdc.
|
||||||
func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) {
|
func (sdc *SDConfig) GetLabels(_ string) ([]*promutils.Labels, error) {
|
||||||
if len(sdc.Names) == 0 {
|
if len(sdc.Names) == 0 {
|
||||||
return nil, fmt.Errorf("`names` cannot be empty in `dns_sd_config`")
|
return nil, fmt.Errorf("`names` cannot be empty in `dns_sd_config`")
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,7 +35,7 @@ type SDConfig struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetLabels returns ec2 labels according to sdc.
|
// GetLabels returns ec2 labels according to sdc.
|
||||||
func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) {
|
func (sdc *SDConfig) GetLabels(_ string) ([]*promutils.Labels, error) {
|
||||||
cfg, err := getAPIConfig(sdc)
|
cfg, err := getAPIConfig(sdc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot get API config: %w", err)
|
return nil, fmt.Errorf("cannot get API config: %w", err)
|
||||||
|
|
|
@ -62,7 +62,7 @@ func (z ZoneYAML) MarshalYAML() (interface{}, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetLabels returns gce labels according to sdc.
|
// GetLabels returns gce labels according to sdc.
|
||||||
func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) {
|
func (sdc *SDConfig) GetLabels(_ string) ([]*promutils.Labels, error) {
|
||||||
cfg, err := getAPIConfig(sdc)
|
cfg, err := getAPIConfig(sdc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot get API config: %w", err)
|
return nil, fmt.Errorf("cannot get API config: %w", err)
|
||||||
|
|
|
@ -91,7 +91,7 @@ type HTTPIngressPath struct {
|
||||||
// getTargetLabels returns labels for ig.
|
// getTargetLabels returns labels for ig.
|
||||||
//
|
//
|
||||||
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ingress
|
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ingress
|
||||||
func (ig *Ingress) getTargetLabels(gw *groupWatcher) []*promutils.Labels {
|
func (ig *Ingress) getTargetLabels(_ *groupWatcher) []*promutils.Labels {
|
||||||
var ms []*promutils.Labels
|
var ms []*promutils.Labels
|
||||||
for _, r := range ig.Spec.Rules {
|
for _, r := range ig.Spec.Rules {
|
||||||
paths := getIngressRulePaths(r.HTTP.Paths)
|
paths := getIngressRulePaths(r.HTTP.Paths)
|
||||||
|
|
|
@ -85,7 +85,7 @@ type NodeDaemonEndpoints struct {
|
||||||
// getTargetLabels returns labels for the given n.
|
// getTargetLabels returns labels for the given n.
|
||||||
//
|
//
|
||||||
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#node
|
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#node
|
||||||
func (n *Node) getTargetLabels(gw *groupWatcher) []*promutils.Labels {
|
func (n *Node) getTargetLabels(_ *groupWatcher) []*promutils.Labels {
|
||||||
addr := getNodeAddr(n.Status.Addresses)
|
addr := getNodeAddr(n.Status.Addresses)
|
||||||
if len(addr) == 0 {
|
if len(addr) == 0 {
|
||||||
// Skip node without address
|
// Skip node without address
|
||||||
|
|
|
@ -73,7 +73,7 @@ type ServicePort struct {
|
||||||
// getTargetLabels returns labels for each port of the given s.
|
// getTargetLabels returns labels for each port of the given s.
|
||||||
//
|
//
|
||||||
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#service
|
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#service
|
||||||
func (s *Service) getTargetLabels(gw *groupWatcher) []*promutils.Labels {
|
func (s *Service) getTargetLabels(_ *groupWatcher) []*promutils.Labels {
|
||||||
host := fmt.Sprintf("%s.%s.svc", s.Metadata.Name, s.Metadata.Namespace)
|
host := fmt.Sprintf("%s.%s.svc", s.Metadata.Name, s.Metadata.Namespace)
|
||||||
var ms []*promutils.Labels
|
var ms []*promutils.Labels
|
||||||
for _, sp := range s.Spec.Ports {
|
for _, sp := range s.Spec.Ports {
|
||||||
|
|
|
@ -63,8 +63,5 @@ func testSanitizeLabelName() error {
|
||||||
if err := f("foo", "foo"); err != nil {
|
if err := f("foo", "foo"); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := f("foo-bar/baz", "foo_bar_baz"); err != nil {
|
return f("foo-bar/baz", "foo_bar_baz")
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,7 +15,7 @@ import (
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
func statStdDial(ctx context.Context, networkUnused, addr string) (net.Conn, error) {
|
func statStdDial(ctx context.Context, _, addr string) (net.Conn, error) {
|
||||||
d := getStdDialer()
|
d := getStdDialer()
|
||||||
network := netutil.GetTCPNetwork()
|
network := netutil.GetTCPNetwork()
|
||||||
conn, err := d.DialContext(ctx, network, addr)
|
conn, err := d.DialContext(ctx, network, addr)
|
||||||
|
|
|
@ -52,19 +52,19 @@ func TestReadLinesBlockFailure(t *testing.T) {
|
||||||
|
|
||||||
type failureReader struct{}
|
type failureReader struct{}
|
||||||
|
|
||||||
func (fr *failureReader) Read(p []byte) (int, error) {
|
func (fr *failureReader) Read(_ []byte) (int, error) {
|
||||||
return 0, fmt.Errorf("some error")
|
return 0, fmt.Errorf("some error")
|
||||||
}
|
}
|
||||||
|
|
||||||
type unexpectedEOF struct{}
|
type unexpectedEOF struct{}
|
||||||
|
|
||||||
func (un unexpectedEOF) Read(p []byte) (int, error) {
|
func (un unexpectedEOF) Read(_ []byte) (int, error) {
|
||||||
return 0, io.ErrUnexpectedEOF
|
return 0, io.ErrUnexpectedEOF
|
||||||
}
|
}
|
||||||
|
|
||||||
type eofErr struct{}
|
type eofErr struct{}
|
||||||
|
|
||||||
func (eo eofErr) Read(p []byte) (int, error) {
|
func (eo eofErr) Read(_ []byte) (int, error) {
|
||||||
return 0, io.EOF
|
return 0, io.EOF
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -61,7 +61,7 @@ func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, retentionDeadline
|
||||||
bsm.nextBlockNoop = true
|
bsm.nextBlockNoop = true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bsm *blockStreamMerger) getRetentionDeadline(bh *blockHeader) int64 {
|
func (bsm *blockStreamMerger) getRetentionDeadline(_ *blockHeader) int64 {
|
||||||
return bsm.retentionDeadline
|
return bsm.retentionDeadline
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -128,7 +128,7 @@ func testBlockStreamReaderReadRows(mp *inmemoryPart, rows []rawRow) error {
|
||||||
func testBlocksStreamReader(t *testing.T, rows []rawRow, expectedBlocksCount int) {
|
func testBlocksStreamReader(t *testing.T, rows []rawRow, expectedBlocksCount int) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
bsr := newTestBlockStreamReader(t, rows)
|
bsr := newTestBlockStreamReader(rows)
|
||||||
blocksCount := 0
|
blocksCount := 0
|
||||||
rowsCount := 0
|
rowsCount := 0
|
||||||
for bsr.NextBlock() {
|
for bsr.NextBlock() {
|
||||||
|
@ -151,7 +151,7 @@ func testBlocksStreamReader(t *testing.T, rows []rawRow, expectedBlocksCount int
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTestBlockStreamReader(t *testing.T, rows []rawRow) *blockStreamReader {
|
func newTestBlockStreamReader(rows []rawRow) *blockStreamReader {
|
||||||
var mp inmemoryPart
|
var mp inmemoryPart
|
||||||
mp.InitFromRows(rows)
|
mp.InitFromRows(rows)
|
||||||
var bsr blockStreamReader
|
var bsr blockStreamReader
|
||||||
|
|
|
@ -1432,7 +1432,7 @@ func (th *topHeap) Swap(i, j int) {
|
||||||
a[j], a[i] = a[i], a[j]
|
a[j], a[i] = a[i], a[j]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (th *topHeap) Push(x interface{}) {
|
func (th *topHeap) Push(_ interface{}) {
|
||||||
panic(fmt.Errorf("BUG: Push shouldn't be called"))
|
panic(fmt.Errorf("BUG: Push shouldn't be called"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,7 +14,7 @@ func TestMergeBlockStreamsOneStreamOneRow(t *testing.T) {
|
||||||
PrecisionBits: defaultPrecisionBits,
|
PrecisionBits: defaultPrecisionBits,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
bsr := newTestBlockStreamReader(t, rows)
|
bsr := newTestBlockStreamReader(rows)
|
||||||
bsrs := []*blockStreamReader{bsr}
|
bsrs := []*blockStreamReader{bsr}
|
||||||
testMergeBlockStreams(t, bsrs, 1, 1, rows[0].Timestamp, rows[0].Timestamp)
|
testMergeBlockStreams(t, bsrs, 1, 1, rows[0].Timestamp, rows[0].Timestamp)
|
||||||
}
|
}
|
||||||
|
@ -38,7 +38,7 @@ func TestMergeBlockStreamsOneStreamOneBlockManyRows(t *testing.T) {
|
||||||
maxTimestamp = r.Timestamp
|
maxTimestamp = r.Timestamp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
bsr := newTestBlockStreamReader(t, rows)
|
bsr := newTestBlockStreamReader(rows)
|
||||||
bsrs := []*blockStreamReader{bsr}
|
bsrs := []*blockStreamReader{bsr}
|
||||||
testMergeBlockStreams(t, bsrs, 1, maxRowsPerBlock, minTimestamp, maxTimestamp)
|
testMergeBlockStreams(t, bsrs, 1, maxRowsPerBlock, minTimestamp, maxTimestamp)
|
||||||
}
|
}
|
||||||
|
@ -65,7 +65,7 @@ func TestMergeBlockStreamsOneStreamManyBlocksOneRow(t *testing.T) {
|
||||||
maxTimestamp = r.Timestamp
|
maxTimestamp = r.Timestamp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
bsr := newTestBlockStreamReader(t, rows)
|
bsr := newTestBlockStreamReader(rows)
|
||||||
bsrs := []*blockStreamReader{bsr}
|
bsrs := []*blockStreamReader{bsr}
|
||||||
testMergeBlockStreams(t, bsrs, blocksCount, blocksCount, minTimestamp, maxTimestamp)
|
testMergeBlockStreams(t, bsrs, blocksCount, blocksCount, minTimestamp, maxTimestamp)
|
||||||
}
|
}
|
||||||
|
@ -93,7 +93,7 @@ func TestMergeBlockStreamsOneStreamManyBlocksManyRows(t *testing.T) {
|
||||||
maxTimestamp = r.Timestamp
|
maxTimestamp = r.Timestamp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
bsr := newTestBlockStreamReader(t, rows)
|
bsr := newTestBlockStreamReader(rows)
|
||||||
bsrs := []*blockStreamReader{bsr}
|
bsrs := []*blockStreamReader{bsr}
|
||||||
testMergeBlockStreams(t, bsrs, blocksCount, rowsCount, minTimestamp, maxTimestamp)
|
testMergeBlockStreams(t, bsrs, blocksCount, rowsCount, minTimestamp, maxTimestamp)
|
||||||
}
|
}
|
||||||
|
@ -107,8 +107,8 @@ func TestMergeBlockStreamsTwoStreamsOneBlockTwoRows(t *testing.T) {
|
||||||
PrecisionBits: defaultPrecisionBits,
|
PrecisionBits: defaultPrecisionBits,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
bsr1 := newTestBlockStreamReader(t, rows)
|
bsr1 := newTestBlockStreamReader(rows)
|
||||||
bsr2 := newTestBlockStreamReader(t, rows)
|
bsr2 := newTestBlockStreamReader(rows)
|
||||||
bsrs := []*blockStreamReader{bsr1, bsr2}
|
bsrs := []*blockStreamReader{bsr1, bsr2}
|
||||||
testMergeBlockStreams(t, bsrs, 1, 2, rows[0].Timestamp, rows[0].Timestamp)
|
testMergeBlockStreams(t, bsrs, 1, 2, rows[0].Timestamp, rows[0].Timestamp)
|
||||||
|
|
||||||
|
@ -122,7 +122,7 @@ func TestMergeBlockStreamsTwoStreamsOneBlockTwoRows(t *testing.T) {
|
||||||
PrecisionBits: defaultPrecisionBits,
|
PrecisionBits: defaultPrecisionBits,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
bsr1 = newTestBlockStreamReader(t, rows)
|
bsr1 = newTestBlockStreamReader(rows)
|
||||||
rows = []rawRow{
|
rows = []rawRow{
|
||||||
{
|
{
|
||||||
Timestamp: minTimestamp,
|
Timestamp: minTimestamp,
|
||||||
|
@ -130,7 +130,7 @@ func TestMergeBlockStreamsTwoStreamsOneBlockTwoRows(t *testing.T) {
|
||||||
PrecisionBits: defaultPrecisionBits,
|
PrecisionBits: defaultPrecisionBits,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
bsr2 = newTestBlockStreamReader(t, rows)
|
bsr2 = newTestBlockStreamReader(rows)
|
||||||
bsrs = []*blockStreamReader{bsr1, bsr2}
|
bsrs = []*blockStreamReader{bsr1, bsr2}
|
||||||
testMergeBlockStreams(t, bsrs, 1, 2, minTimestamp, maxTimestamp)
|
testMergeBlockStreams(t, bsrs, 1, 2, minTimestamp, maxTimestamp)
|
||||||
}
|
}
|
||||||
|
@ -149,7 +149,7 @@ func TestMergeBlockStreamsTwoStreamsTwoBlocksOneRow(t *testing.T) {
|
||||||
PrecisionBits: defaultPrecisionBits,
|
PrecisionBits: defaultPrecisionBits,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
bsr1 := newTestBlockStreamReader(t, rows)
|
bsr1 := newTestBlockStreamReader(rows)
|
||||||
|
|
||||||
rows = []rawRow{
|
rows = []rawRow{
|
||||||
{
|
{
|
||||||
|
@ -161,7 +161,7 @@ func TestMergeBlockStreamsTwoStreamsTwoBlocksOneRow(t *testing.T) {
|
||||||
PrecisionBits: defaultPrecisionBits,
|
PrecisionBits: defaultPrecisionBits,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
bsr2 := newTestBlockStreamReader(t, rows)
|
bsr2 := newTestBlockStreamReader(rows)
|
||||||
|
|
||||||
bsrs := []*blockStreamReader{bsr1, bsr2}
|
bsrs := []*blockStreamReader{bsr1, bsr2}
|
||||||
testMergeBlockStreams(t, bsrs, 2, 2, minTimestamp, maxTimestamp)
|
testMergeBlockStreams(t, bsrs, 2, 2, minTimestamp, maxTimestamp)
|
||||||
|
@ -191,7 +191,7 @@ func TestMergeBlockStreamsTwoStreamsManyBlocksManyRows(t *testing.T) {
|
||||||
maxTimestamp = r.Timestamp
|
maxTimestamp = r.Timestamp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
bsr1 := newTestBlockStreamReader(t, rows)
|
bsr1 := newTestBlockStreamReader(rows)
|
||||||
|
|
||||||
rows = rows[:0]
|
rows = rows[:0]
|
||||||
const rowsCount2 = 3281
|
const rowsCount2 = 3281
|
||||||
|
@ -208,7 +208,7 @@ func TestMergeBlockStreamsTwoStreamsManyBlocksManyRows(t *testing.T) {
|
||||||
maxTimestamp = r.Timestamp
|
maxTimestamp = r.Timestamp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
bsr2 := newTestBlockStreamReader(t, rows)
|
bsr2 := newTestBlockStreamReader(rows)
|
||||||
|
|
||||||
bsrs := []*blockStreamReader{bsr1, bsr2}
|
bsrs := []*blockStreamReader{bsr1, bsr2}
|
||||||
testMergeBlockStreams(t, bsrs, blocksCount, rowsCount1+rowsCount2, minTimestamp, maxTimestamp)
|
testMergeBlockStreams(t, bsrs, blocksCount, rowsCount1+rowsCount2, minTimestamp, maxTimestamp)
|
||||||
|
@ -235,7 +235,7 @@ func TestMergeBlockStreamsTwoStreamsBigOverlappingBlocks(t *testing.T) {
|
||||||
maxTimestamp = r.Timestamp
|
maxTimestamp = r.Timestamp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
bsr1 := newTestBlockStreamReader(t, rows)
|
bsr1 := newTestBlockStreamReader(rows)
|
||||||
|
|
||||||
rows = rows[:0]
|
rows = rows[:0]
|
||||||
const rowsCount2 = maxRowsPerBlock + 2344
|
const rowsCount2 = maxRowsPerBlock + 2344
|
||||||
|
@ -251,7 +251,7 @@ func TestMergeBlockStreamsTwoStreamsBigOverlappingBlocks(t *testing.T) {
|
||||||
maxTimestamp = r.Timestamp
|
maxTimestamp = r.Timestamp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
bsr2 := newTestBlockStreamReader(t, rows)
|
bsr2 := newTestBlockStreamReader(rows)
|
||||||
|
|
||||||
bsrs := []*blockStreamReader{bsr1, bsr2}
|
bsrs := []*blockStreamReader{bsr1, bsr2}
|
||||||
testMergeBlockStreams(t, bsrs, 3, rowsCount1+rowsCount2, minTimestamp, maxTimestamp)
|
testMergeBlockStreams(t, bsrs, 3, rowsCount1+rowsCount2, minTimestamp, maxTimestamp)
|
||||||
|
@ -279,7 +279,7 @@ func TestMergeBlockStreamsTwoStreamsBigSequentialBlocks(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
maxTimestampB1 := rows[len(rows)-1].Timestamp
|
maxTimestampB1 := rows[len(rows)-1].Timestamp
|
||||||
bsr1 := newTestBlockStreamReader(t, rows)
|
bsr1 := newTestBlockStreamReader(rows)
|
||||||
|
|
||||||
rows = rows[:0]
|
rows = rows[:0]
|
||||||
const rowsCount2 = maxRowsPerBlock - 233
|
const rowsCount2 = maxRowsPerBlock - 233
|
||||||
|
@ -295,7 +295,7 @@ func TestMergeBlockStreamsTwoStreamsBigSequentialBlocks(t *testing.T) {
|
||||||
maxTimestamp = r.Timestamp
|
maxTimestamp = r.Timestamp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
bsr2 := newTestBlockStreamReader(t, rows)
|
bsr2 := newTestBlockStreamReader(rows)
|
||||||
|
|
||||||
bsrs := []*blockStreamReader{bsr1, bsr2}
|
bsrs := []*blockStreamReader{bsr1, bsr2}
|
||||||
testMergeBlockStreams(t, bsrs, 3, rowsCount1+rowsCount2, minTimestamp, maxTimestamp)
|
testMergeBlockStreams(t, bsrs, 3, rowsCount1+rowsCount2, minTimestamp, maxTimestamp)
|
||||||
|
@ -329,7 +329,7 @@ func TestMergeBlockStreamsManyStreamsManyBlocksManyRows(t *testing.T) {
|
||||||
maxTimestamp = r.Timestamp
|
maxTimestamp = r.Timestamp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
bsr := newTestBlockStreamReader(t, rows)
|
bsr := newTestBlockStreamReader(rows)
|
||||||
bsrs = append(bsrs, bsr)
|
bsrs = append(bsrs, bsr)
|
||||||
rowsCount += rowsPerStream
|
rowsCount += rowsPerStream
|
||||||
}
|
}
|
||||||
|
@ -363,7 +363,7 @@ func TestMergeForciblyStop(t *testing.T) {
|
||||||
maxTimestamp = r.Timestamp
|
maxTimestamp = r.Timestamp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
bsr := newTestBlockStreamReader(t, rows)
|
bsr := newTestBlockStreamReader(rows)
|
||||||
bsrs = append(bsrs, bsr)
|
bsrs = append(bsrs, bsr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -881,13 +881,13 @@ func (pt *partition) flushInmemoryParts(isFinal bool) {
|
||||||
|
|
||||||
func (rrss *rawRowsShards) flush(pt *partition, dst []rawRow, isFinal bool) []rawRow {
|
func (rrss *rawRowsShards) flush(pt *partition, dst []rawRow, isFinal bool) []rawRow {
|
||||||
for i := range rrss.shards {
|
for i := range rrss.shards {
|
||||||
dst = rrss.shards[i].appendRawRowsToFlush(dst, pt, isFinal)
|
dst = rrss.shards[i].appendRawRowsToFlush(dst, isFinal)
|
||||||
}
|
}
|
||||||
pt.flushRowsToParts(dst)
|
pt.flushRowsToParts(dst)
|
||||||
return dst
|
return dst
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rrs *rawRowsShard) appendRawRowsToFlush(dst []rawRow, pt *partition, isFinal bool) []rawRow {
|
func (rrs *rawRowsShard) appendRawRowsToFlush(dst []rawRow, isFinal bool) []rawRow {
|
||||||
currentTime := fasttime.UnixTimestamp()
|
currentTime := fasttime.UnixTimestamp()
|
||||||
flushSeconds := int64(pendingRowsFlushInterval.Seconds())
|
flushSeconds := int64(pendingRowsFlushInterval.Seconds())
|
||||||
if flushSeconds <= 0 {
|
if flushSeconds <= 0 {
|
||||||
|
|
|
@ -84,7 +84,6 @@ func TestSearch(t *testing.T) {
|
||||||
const rowsCount = 2e4
|
const rowsCount = 2e4
|
||||||
const rowsPerBlock = 1e3
|
const rowsPerBlock = 1e3
|
||||||
const metricGroupsCount = rowsCount / 5
|
const metricGroupsCount = rowsCount / 5
|
||||||
const accountsCount = 2
|
|
||||||
|
|
||||||
mrs := make([]MetricRow, rowsCount)
|
mrs := make([]MetricRow, rowsCount)
|
||||||
var mn MetricName
|
var mn MetricName
|
||||||
|
@ -127,7 +126,7 @@ func TestSearch(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run("serial", func(t *testing.T) {
|
t.Run("serial", func(t *testing.T) {
|
||||||
if err := testSearchInternal(st, tr, mrs, accountsCount); err != nil {
|
if err := testSearchInternal(st, tr, mrs); err != nil {
|
||||||
t.Fatalf("unexpected error: %s", err)
|
t.Fatalf("unexpected error: %s", err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -136,7 +135,7 @@ func TestSearch(t *testing.T) {
|
||||||
ch := make(chan error, 3)
|
ch := make(chan error, 3)
|
||||||
for i := 0; i < cap(ch); i++ {
|
for i := 0; i < cap(ch); i++ {
|
||||||
go func() {
|
go func() {
|
||||||
ch <- testSearchInternal(st, tr, mrs, accountsCount)
|
ch <- testSearchInternal(st, tr, mrs)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
var firstError error
|
var firstError error
|
||||||
|
@ -156,7 +155,7 @@ func TestSearch(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func testSearchInternal(st *Storage, tr TimeRange, mrs []MetricRow, accountsCount int) error {
|
func testSearchInternal(st *Storage, tr TimeRange, mrs []MetricRow) error {
|
||||||
var s Search
|
var s Search
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
// Prepare TagFilters for search.
|
// Prepare TagFilters for search.
|
||||||
|
|
|
@ -22,7 +22,7 @@ func newAvgAggrState() *avgAggrState {
|
||||||
return &avgAggrState{}
|
return &avgAggrState{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (as *avgAggrState) pushSample(inputKey, outputKey string, value float64) {
|
func (as *avgAggrState) pushSample(_, outputKey string, value float64) {
|
||||||
again:
|
again:
|
||||||
v, ok := as.m.Load(outputKey)
|
v, ok := as.m.Load(outputKey)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
|
@ -21,7 +21,7 @@ func newCountSamplesAggrState() *countSamplesAggrState {
|
||||||
return &countSamplesAggrState{}
|
return &countSamplesAggrState{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (as *countSamplesAggrState) pushSample(inputKey, outputKey string, value float64) {
|
func (as *countSamplesAggrState) pushSample(_, outputKey string, _ float64) {
|
||||||
again:
|
again:
|
||||||
v, ok := as.m.Load(outputKey)
|
v, ok := as.m.Load(outputKey)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
|
@ -22,7 +22,7 @@ func newCountSeriesAggrState() *countSeriesAggrState {
|
||||||
return &countSeriesAggrState{}
|
return &countSeriesAggrState{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (as *countSeriesAggrState) pushSample(inputKey, outputKey string, value float64) {
|
func (as *countSeriesAggrState) pushSample(inputKey, outputKey string, _ float64) {
|
||||||
again:
|
again:
|
||||||
v, ok := as.m.Load(outputKey)
|
v, ok := as.m.Load(outputKey)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
|
@ -30,7 +30,7 @@ func newHistogramBucketAggrState(stalenessInterval time.Duration) *histogramBuck
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (as *histogramBucketAggrState) pushSample(inputKey, outputKey string, value float64) {
|
func (as *histogramBucketAggrState) pushSample(_, outputKey string, value float64) {
|
||||||
currentTime := fasttime.UnixTimestamp()
|
currentTime := fasttime.UnixTimestamp()
|
||||||
deleteDeadline := currentTime + as.stalenessSecs
|
deleteDeadline := currentTime + as.stalenessSecs
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,7 @@ func newLastAggrState() *lastAggrState {
|
||||||
return &lastAggrState{}
|
return &lastAggrState{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (as *lastAggrState) pushSample(inputKey, outputKey string, value float64) {
|
func (as *lastAggrState) pushSample(_, outputKey string, value float64) {
|
||||||
again:
|
again:
|
||||||
v, ok := as.m.Load(outputKey)
|
v, ok := as.m.Load(outputKey)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
|
@ -21,7 +21,7 @@ func newMaxAggrState() *maxAggrState {
|
||||||
return &maxAggrState{}
|
return &maxAggrState{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (as *maxAggrState) pushSample(inputKey, outputKey string, value float64) {
|
func (as *maxAggrState) pushSample(_, outputKey string, value float64) {
|
||||||
again:
|
again:
|
||||||
v, ok := as.m.Load(outputKey)
|
v, ok := as.m.Load(outputKey)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
|
@ -21,7 +21,7 @@ func newMinAggrState() *minAggrState {
|
||||||
return &minAggrState{}
|
return &minAggrState{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (as *minAggrState) pushSample(inputKey, outputKey string, value float64) {
|
func (as *minAggrState) pushSample(_, outputKey string, value float64) {
|
||||||
again:
|
again:
|
||||||
v, ok := as.m.Load(outputKey)
|
v, ok := as.m.Load(outputKey)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
|
@ -28,7 +28,7 @@ func newQuantilesAggrState(phis []float64) *quantilesAggrState {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (as *quantilesAggrState) pushSample(inputKey, outputKey string, value float64) {
|
func (as *quantilesAggrState) pushSample(_, outputKey string, value float64) {
|
||||||
again:
|
again:
|
||||||
v, ok := as.m.Load(outputKey)
|
v, ok := as.m.Load(outputKey)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
|
@ -24,7 +24,7 @@ func newStddevAggrState() *stddevAggrState {
|
||||||
return &stddevAggrState{}
|
return &stddevAggrState{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (as *stddevAggrState) pushSample(inputKey, outputKey string, value float64) {
|
func (as *stddevAggrState) pushSample(_, outputKey string, value float64) {
|
||||||
again:
|
again:
|
||||||
v, ok := as.m.Load(outputKey)
|
v, ok := as.m.Load(outputKey)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
|
@ -23,7 +23,7 @@ func newStdvarAggrState() *stdvarAggrState {
|
||||||
return &stdvarAggrState{}
|
return &stdvarAggrState{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (as *stdvarAggrState) pushSample(inputKey, outputKey string, value float64) {
|
func (as *stdvarAggrState) pushSample(_, outputKey string, value float64) {
|
||||||
again:
|
again:
|
||||||
v, ok := as.m.Load(outputKey)
|
v, ok := as.m.Load(outputKey)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
|
@ -21,7 +21,7 @@ func newSumSamplesAggrState() *sumSamplesAggrState {
|
||||||
return &sumSamplesAggrState{}
|
return &sumSamplesAggrState{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (as *sumSamplesAggrState) pushSample(inputKey, outputKey string, value float64) {
|
func (as *sumSamplesAggrState) pushSample(_, outputKey string, value float64) {
|
||||||
again:
|
again:
|
||||||
v, ok := as.m.Load(outputKey)
|
v, ok := as.m.Load(outputKey)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
Loading…
Reference in a new issue