2023-04-01 06:35:26 +00:00
package graphite
import (
"flag"
"fmt"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/graphiteql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
)
var maxGraphiteSeries = flag . Int ( "search.maxGraphiteSeries" , 300e3 , "The maximum number of time series, which can be scanned during queries to Graphite Render API. " +
"See https://docs.victoriametrics.com/#graphite-render-api-usage" )
type evalConfig struct {
at * auth . Token
startTime int64
endTime int64
storageStep int64
deadline searchutils . Deadline
currentTime time . Time
// xFilesFactor is used for determining when consolidateFunc must be applied.
//
// 0 means that consolidateFunc should be applied if at least a single non-NaN data point exists on the given step.
// 1 means that consolidateFunc should be applied if all the data points are non-NaN on the given step.
xFilesFactor float64
// Enforced tag filters
etfs [ ] [ ] storage . TagFilter
// originalQuery contains the original query - used for debug logging.
originalQuery string
}
func ( ec * evalConfig ) pointsLen ( step int64 ) int {
return int ( ( ec . endTime - ec . startTime ) / step )
}
func ( ec * evalConfig ) newTimestamps ( step int64 ) [ ] int64 {
pointsLen := ec . pointsLen ( step )
timestamps := make ( [ ] int64 , pointsLen )
ts := ec . startTime
for i := 0 ; i < pointsLen ; i ++ {
timestamps [ i ] = ts
ts += step
}
return timestamps
}
type series struct {
Name string
Tags map [ string ] string
Timestamps [ ] int64
Values [ ] float64
// holds current path expression like graphite does.
pathExpression string
expr graphiteql . Expr
// consolidateFunc is applied to raw samples in order to generate data points algined to the given step.
// see series.consolidate() function for details.
consolidateFunc aggrFunc
// xFilesFactor is used for determining when consolidateFunc must be applied.
//
// 0 means that consolidateFunc should be applied if at least a single non-NaN data point exists on the given step.
// 1 means that consolidateFunc should be applied if all the data points are non-NaN on the given step.
xFilesFactor float64
step int64
}
func ( s * series ) consolidate ( ec * evalConfig , step int64 ) {
aggrFunc := s . consolidateFunc
if aggrFunc == nil {
aggrFunc = aggrAvg
}
xFilesFactor := s . xFilesFactor
if s . xFilesFactor <= 0 {
xFilesFactor = ec . xFilesFactor
}
s . summarize ( aggrFunc , ec . startTime , ec . endTime , step , xFilesFactor )
}
func ( s * series ) summarize ( aggrFunc aggrFunc , startTime , endTime , step int64 , xFilesFactor float64 ) {
pointsLen := int ( ( endTime - startTime ) / step )
timestamps := s . Timestamps
values := s . Values
dstTimestamps := make ( [ ] int64 , 0 , pointsLen )
dstValues := make ( [ ] float64 , 0 , pointsLen )
ts := startTime
i := 0
for len ( dstTimestamps ) < pointsLen {
tsEnd := ts + step
j := i
for j < len ( timestamps ) && timestamps [ j ] < tsEnd {
j ++
}
if i == j && i > 0 && ts - timestamps [ i - 1 ] <= 2000 {
// The current [ts ... tsEnd) interval has no samples,
// but the last sample on the previous interval [ts - step ... ts)
// is closer than 2 seconds to the current interval.
// Let's consider that this sample belongs to the current interval,
// since such discrepancy could appear because of small jitter in samples' ingestion.
i --
}
v := aggrFunc . apply ( xFilesFactor , values [ i : j ] )
dstTimestamps = append ( dstTimestamps , ts )
dstValues = append ( dstValues , v )
ts = tsEnd
i = j
}
// Do not reuse s.Timestamps and s.Values, since they can be too big
s . Timestamps = dstTimestamps
s . Values = dstValues
s . step = step
}
func execExpr ( ec * evalConfig , query string ) ( nextSeriesFunc , error ) {
expr , err := graphiteql . Parse ( query )
if err != nil {
return nil , fmt . Errorf ( "cannot parse %q: %w" , query , err )
}
return evalExpr ( ec , expr )
}
func evalExpr ( ec * evalConfig , expr graphiteql . Expr ) ( nextSeriesFunc , error ) {
switch t := expr . ( type ) {
case * graphiteql . MetricExpr :
return evalMetricExpr ( ec , t )
case * graphiteql . FuncExpr :
return evalFuncExpr ( ec , t )
default :
return nil , fmt . Errorf ( "unexpected expression type %T; want graphiteql.MetricExpr or graphiteql.FuncExpr; expr: %q" , t , t . AppendString ( nil ) )
}
}
func evalMetricExpr ( ec * evalConfig , me * graphiteql . MetricExpr ) ( nextSeriesFunc , error ) {
tfs := [ ] storage . TagFilter { {
Key : [ ] byte ( "__graphite__" ) ,
Value : [ ] byte ( me . Query ) ,
} }
tfss := joinTagFilterss ( tfs , ec . etfs )
sq := storage . NewSearchQuery ( ec . at . AccountID , ec . at . ProjectID , ec . startTime , ec . endTime , tfss , * maxGraphiteSeries )
return newNextSeriesForSearchQuery ( ec , sq , me )
}
func newNextSeriesForSearchQuery ( ec * evalConfig , sq * storage . SearchQuery , expr graphiteql . Expr ) ( nextSeriesFunc , error ) {
denyPartialResponse := true
rss , _ , err := netstorage . ProcessSearchQuery ( nil , denyPartialResponse , sq , ec . deadline )
if err != nil {
return nil , fmt . Errorf ( "cannot fetch data for %q: %w" , sq , err )
}
seriesCh := make ( chan * series , cgroup . AvailableCPUs ( ) )
errCh := make ( chan error , 1 )
go func ( ) {
2024-04-02 20:16:24 +00:00
err := rss . RunParallel ( nil , func ( rs * netstorage . Result , _ uint ) error {
2023-04-01 06:35:26 +00:00
nameWithTags := getCanonicalPath ( & rs . MetricName )
tags := unmarshalTags ( nameWithTags )
s := & series {
Name : tags [ "name" ] ,
Tags : tags ,
Timestamps : append ( [ ] int64 { } , rs . Timestamps ... ) ,
Values : append ( [ ] float64 { } , rs . Values ... ) ,
expr : expr ,
pathExpression : string ( expr . AppendString ( nil ) ) ,
}
s . summarize ( aggrAvg , ec . startTime , ec . endTime , ec . storageStep , 0 )
t := timerpool . Get ( 30 * time . Second )
select {
case seriesCh <- s :
case <- t . C :
logger . Errorf ( "resource leak when processing the %s (full query: %s); please report this error to VictoriaMetrics developers" ,
expr . AppendString ( nil ) , ec . originalQuery )
}
timerpool . Put ( t )
return nil
} )
close ( seriesCh )
errCh <- err
} ( )
f := func ( ) ( * series , error ) {
s := <- seriesCh
if s != nil {
return s , nil
}
err := <- errCh
return nil , err
}
return f , nil
}
func evalFuncExpr ( ec * evalConfig , fe * graphiteql . FuncExpr ) ( nextSeriesFunc , error ) {
// Do not lowercase the fe.FuncName, since Graphite function names are case-sensitive.
tf := transformFuncs [ fe . FuncName ]
if tf == nil {
return nil , fmt . Errorf ( "unknown function %q" , fe . FuncName )
}
nextSeries , err := tf ( ec , fe )
if err != nil {
return nil , fmt . Errorf ( "cannot evaluate %s: %w" , fe . AppendString ( nil ) , err )
}
return nextSeries , nil
}