2023-06-20 05:55:12 +00:00
package logsql
import (
2024-05-12 14:33:29 +00:00
"context"
2024-05-15 02:55:44 +00:00
"fmt"
"math"
2023-06-20 05:55:12 +00:00
"net/http"
2024-09-06 21:00:55 +00:00
"slices"
2024-05-20 02:08:30 +00:00
"sort"
2024-06-28 01:08:37 +00:00
"strconv"
2024-05-20 02:08:30 +00:00
"strings"
"sync"
2024-05-15 02:55:44 +00:00
"time"
2023-06-20 05:55:12 +00:00
2024-06-27 12:18:42 +00:00
"github.com/VictoriaMetrics/metrics"
2023-06-20 05:55:12 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
2024-02-18 20:58:47 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
2024-06-27 13:05:57 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2023-06-20 05:55:12 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
2024-05-15 02:55:44 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
2023-06-20 05:55:12 +00:00
)
2024-05-20 02:08:30 +00:00
// ProcessHitsRequest handles /select/logsql/hits request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-hits-stats
func ProcessHitsRequest ( ctx context . Context , w http . ResponseWriter , r * http . Request ) {
q , tenantIDs , err := parseCommonArgs ( r )
2023-06-20 05:55:12 +00:00
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
2024-05-20 02:08:30 +00:00
// Obtain step
stepStr := r . FormValue ( "step" )
if stepStr == "" {
stepStr = "1d"
}
step , err := promutils . ParseDuration ( stepStr )
2023-06-20 05:55:12 +00:00
if err != nil {
2024-05-20 02:08:30 +00:00
httpserver . Errorf ( w , r , "cannot parse 'step' arg: %s" , err )
2023-06-20 05:55:12 +00:00
return
}
2024-05-20 02:08:30 +00:00
if step <= 0 {
httpserver . Errorf ( w , r , "'step' must be bigger than zero" )
2024-09-06 22:41:44 +00:00
return
2024-05-20 02:08:30 +00:00
}
2024-02-18 21:01:34 +00:00
2024-05-20 02:08:30 +00:00
// Obtain offset
offsetStr := r . FormValue ( "offset" )
if offsetStr == "" {
offsetStr = "0s"
}
offset , err := promutils . ParseDuration ( offsetStr )
if err != nil {
httpserver . Errorf ( w , r , "cannot parse 'offset' arg: %s" , err )
return
}
// Obtain field entries
fields := r . Form [ "field" ]
2024-06-28 01:08:37 +00:00
// Obtain limit on the number of top fields entries.
fieldsLimit , err := httputils . GetInt ( r , "fields_limit" )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
if fieldsLimit < 0 {
fieldsLimit = 0
}
2024-07-09 22:39:16 +00:00
// Prepare the query for hits count.
2024-05-20 02:08:30 +00:00
q . Optimize ( )
2024-07-09 22:39:16 +00:00
q . DropAllPipes ( )
q . AddCountByTimePipe ( int64 ( step ) , int64 ( offset ) , fields )
2024-05-20 02:08:30 +00:00
var mLock sync . Mutex
m := make ( map [ string ] * hitsSeries )
writeBlock := func ( _ uint , timestamps [ ] int64 , columns [ ] logstorage . BlockColumn ) {
if len ( columns ) == 0 || len ( columns [ 0 ] . Values ) == 0 {
return
}
timestampValues := columns [ 0 ] . Values
hitsValues := columns [ len ( columns ) - 1 ] . Values
columns = columns [ 1 : len ( columns ) - 1 ]
bb := blockResultPool . Get ( )
for i := range timestamps {
timestampStr := strings . Clone ( timestampValues [ i ] )
hitsStr := strings . Clone ( hitsValues [ i ] )
2024-06-28 01:08:37 +00:00
hits , err := strconv . ParseUint ( hitsStr , 10 , 64 )
if err != nil {
logger . Panicf ( "BUG: cannot parse hitsStr=%q: %s" , hitsStr , err )
}
2024-05-20 02:08:30 +00:00
bb . Reset ( )
2024-05-25 19:36:16 +00:00
WriteFieldsForHits ( bb , columns , i )
2024-05-20 02:08:30 +00:00
mLock . Lock ( )
hs , ok := m [ string ( bb . B ) ]
if ! ok {
k := string ( bb . B )
hs = & hitsSeries { }
m [ k ] = hs
}
hs . timestamps = append ( hs . timestamps , timestampStr )
2024-06-28 01:08:37 +00:00
hs . hits = append ( hs . hits , hits )
hs . hitsTotal += hits
2024-05-20 02:08:30 +00:00
mLock . Unlock ( )
}
blockResultPool . Put ( bb )
}
// Execute the query
if err := vlstorage . RunQuery ( ctx , tenantIDs , q , writeBlock ) ; err != nil {
httpserver . Errorf ( w , r , "cannot execute query [%s]: %s" , q , err )
return
}
2024-06-28 01:08:37 +00:00
m = getTopHitsSeries ( m , fieldsLimit )
2024-05-20 02:08:30 +00:00
// Write response
w . Header ( ) . Set ( "Content-Type" , "application/json" )
WriteHitsSeries ( w , m )
}
2024-06-28 01:08:37 +00:00
func getTopHitsSeries ( m map [ string ] * hitsSeries , fieldsLimit int ) map [ string ] * hitsSeries {
if fieldsLimit <= 0 || fieldsLimit >= len ( m ) {
return m
}
type fieldsHits struct {
fieldsStr string
hs * hitsSeries
}
a := make ( [ ] fieldsHits , 0 , len ( m ) )
for fieldsStr , hs := range m {
a = append ( a , fieldsHits {
fieldsStr : fieldsStr ,
hs : hs ,
} )
}
sort . Slice ( a , func ( i , j int ) bool {
return a [ i ] . hs . hitsTotal > a [ j ] . hs . hitsTotal
} )
hitsOther := make ( map [ string ] uint64 )
for _ , x := range a [ fieldsLimit : ] {
for i , timestampStr := range x . hs . timestamps {
hitsOther [ timestampStr ] += x . hs . hits [ i ]
}
}
var hsOther hitsSeries
for timestampStr , hits := range hitsOther {
hsOther . timestamps = append ( hsOther . timestamps , timestampStr )
hsOther . hits = append ( hsOther . hits , hits )
hsOther . hitsTotal += hits
}
mNew := make ( map [ string ] * hitsSeries , fieldsLimit + 1 )
for _ , x := range a [ : fieldsLimit ] {
mNew [ x . fieldsStr ] = x . hs
}
mNew [ "{}" ] = & hsOther
return mNew
}
2024-05-20 02:08:30 +00:00
type hitsSeries struct {
2024-06-28 01:08:37 +00:00
hitsTotal uint64
2024-05-20 02:08:30 +00:00
timestamps [ ] string
2024-06-28 01:08:37 +00:00
hits [ ] uint64
2024-05-20 02:08:30 +00:00
}
func ( hs * hitsSeries ) sort ( ) {
sort . Sort ( hs )
}
func ( hs * hitsSeries ) Len ( ) int {
return len ( hs . timestamps )
}
func ( hs * hitsSeries ) Swap ( i , j int ) {
hs . timestamps [ i ] , hs . timestamps [ j ] = hs . timestamps [ j ] , hs . timestamps [ i ]
2024-06-28 01:08:37 +00:00
hs . hits [ i ] , hs . hits [ j ] = hs . hits [ j ] , hs . hits [ i ]
2024-05-20 02:08:30 +00:00
}
func ( hs * hitsSeries ) Less ( i , j int ) bool {
return hs . timestamps [ i ] < hs . timestamps [ j ]
}
// ProcessFieldNamesRequest handles /select/logsql/field_names request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-field-names
func ProcessFieldNamesRequest ( ctx context . Context , w http . ResponseWriter , r * http . Request ) {
q , tenantIDs , err := parseCommonArgs ( r )
2024-05-15 02:55:44 +00:00
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
2024-05-20 02:08:30 +00:00
// Obtain field names for the given query
q . Optimize ( )
fieldNames , err := vlstorage . GetFieldNames ( ctx , tenantIDs , q )
if err != nil {
httpserver . Errorf ( w , r , "cannot obtain field names: %s" , err )
return
}
// Write results
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2024-05-24 01:06:55 +00:00
WriteValuesWithHitsJSON ( w , fieldNames )
2024-05-20 02:08:30 +00:00
}
// ProcessFieldValuesRequest handles /select/logsql/field_values request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-field-values
func ProcessFieldValuesRequest ( ctx context . Context , w http . ResponseWriter , r * http . Request ) {
q , tenantIDs , err := parseCommonArgs ( r )
2024-05-15 02:55:44 +00:00
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
2024-05-20 02:08:30 +00:00
// Parse fieldName query arg
2024-05-22 19:01:20 +00:00
fieldName := r . FormValue ( "field" )
2024-05-20 02:08:30 +00:00
if fieldName == "" {
2024-05-22 19:01:20 +00:00
httpserver . Errorf ( w , r , "missing 'field' query arg" )
2024-05-20 02:08:30 +00:00
return
2024-05-15 02:55:44 +00:00
}
// Parse limit query arg
limit , err := httputils . GetInt ( r , "limit" )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
2024-05-20 02:08:30 +00:00
if limit < 0 {
limit = 0
2024-05-14 01:05:03 +00:00
}
2024-05-20 02:08:30 +00:00
// Obtain unique values for the given field
2024-05-15 02:55:44 +00:00
q . Optimize ( )
2024-05-20 02:08:30 +00:00
values , err := vlstorage . GetFieldValues ( ctx , tenantIDs , q , fieldName , uint64 ( limit ) )
if err != nil {
httpserver . Errorf ( w , r , "cannot obtain values for field %q: %s" , fieldName , err )
return
}
2024-05-14 01:05:03 +00:00
2024-05-22 19:01:20 +00:00
// Write results
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2024-05-24 01:06:55 +00:00
WriteValuesWithHitsJSON ( w , values )
2024-05-22 19:01:20 +00:00
}
2024-05-25 19:36:16 +00:00
// ProcessStreamFieldNamesRequest processes /select/logsql/stream_field_names request.
2024-05-22 19:01:20 +00:00
//
2024-05-25 19:36:16 +00:00
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-stream-field-names
func ProcessStreamFieldNamesRequest ( ctx context . Context , w http . ResponseWriter , r * http . Request ) {
2024-05-22 19:01:20 +00:00
q , tenantIDs , err := parseCommonArgs ( r )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
2024-05-25 19:36:16 +00:00
// Obtain stream field names for the given query
2024-05-22 19:01:20 +00:00
q . Optimize ( )
2024-05-25 19:36:16 +00:00
names , err := vlstorage . GetStreamFieldNames ( ctx , tenantIDs , q )
2024-05-22 19:01:20 +00:00
if err != nil {
2024-05-25 19:36:16 +00:00
httpserver . Errorf ( w , r , "cannot obtain stream field names: %s" , err )
2024-05-20 02:08:30 +00:00
}
// Write results
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2024-05-24 01:06:55 +00:00
WriteValuesWithHitsJSON ( w , names )
2024-05-22 19:01:20 +00:00
}
2024-05-25 19:36:16 +00:00
// ProcessStreamFieldValuesRequest processes /select/logsql/stream_field_values request.
2024-05-22 19:01:20 +00:00
//
2024-05-25 19:36:16 +00:00
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-stream-field-values
func ProcessStreamFieldValuesRequest ( ctx context . Context , w http . ResponseWriter , r * http . Request ) {
2024-05-22 19:01:20 +00:00
q , tenantIDs , err := parseCommonArgs ( r )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
2024-05-25 19:36:16 +00:00
// Parse fieldName query arg
fieldName := r . FormValue ( "field" )
if fieldName == "" {
httpserver . Errorf ( w , r , "missing 'field' query arg" )
2024-05-22 19:01:20 +00:00
return
}
// Parse limit query arg
limit , err := httputils . GetInt ( r , "limit" )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
if limit < 0 {
limit = 0
}
2024-05-25 19:36:16 +00:00
// Obtain stream field values for the given query and the given fieldName
2024-05-22 19:01:20 +00:00
q . Optimize ( )
2024-05-25 19:36:16 +00:00
values , err := vlstorage . GetStreamFieldValues ( ctx , tenantIDs , q , fieldName , uint64 ( limit ) )
2024-05-22 19:01:20 +00:00
if err != nil {
2024-05-25 19:36:16 +00:00
httpserver . Errorf ( w , r , "cannot obtain stream field values: %s" , err )
2024-05-22 19:01:20 +00:00
}
// Write results
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2024-05-24 01:06:55 +00:00
WriteValuesWithHitsJSON ( w , values )
2024-05-22 19:01:20 +00:00
}
2024-06-27 12:18:42 +00:00
// ProcessStreamIDsRequest processes /select/logsql/stream_ids request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-stream_ids
func ProcessStreamIDsRequest ( ctx context . Context , w http . ResponseWriter , r * http . Request ) {
q , tenantIDs , err := parseCommonArgs ( r )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
// Parse limit query arg
limit , err := httputils . GetInt ( r , "limit" )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
if limit < 0 {
limit = 0
}
// Obtain streamIDs for the given query
q . Optimize ( )
streamIDs , err := vlstorage . GetStreamIDs ( ctx , tenantIDs , q , uint64 ( limit ) )
if err != nil {
httpserver . Errorf ( w , r , "cannot obtain stream_ids: %s" , err )
}
// Write results
w . Header ( ) . Set ( "Content-Type" , "application/json" )
WriteValuesWithHitsJSON ( w , streamIDs )
}
2024-05-22 19:01:20 +00:00
// ProcessStreamsRequest processes /select/logsql/streams request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-streams
func ProcessStreamsRequest ( ctx context . Context , w http . ResponseWriter , r * http . Request ) {
q , tenantIDs , err := parseCommonArgs ( r )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
// Parse limit query arg
limit , err := httputils . GetInt ( r , "limit" )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
if limit < 0 {
limit = 0
}
// Obtain streams for the given query
q . Optimize ( )
streams , err := vlstorage . GetStreams ( ctx , tenantIDs , q , uint64 ( limit ) )
if err != nil {
httpserver . Errorf ( w , r , "cannot obtain streams: %s" , err )
}
// Write results
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2024-05-24 01:06:55 +00:00
WriteValuesWithHitsJSON ( w , streams )
2024-05-20 02:08:30 +00:00
}
2024-06-27 12:18:42 +00:00
// ProcessLiveTailRequest processes live tailing request to /select/logsq/tail
2024-09-06 21:00:55 +00:00
//
// See https://docs.victoriametrics.com/victorialogs/querying/#live-tailing
2024-06-27 12:18:42 +00:00
func ProcessLiveTailRequest ( ctx context . Context , w http . ResponseWriter , r * http . Request ) {
liveTailRequests . Inc ( )
defer liveTailRequests . Dec ( )
q , tenantIDs , err := parseCommonArgs ( r )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
if ! q . CanLiveTail ( ) {
httpserver . Errorf ( w , r , "the query [%s] cannot be used in live tailing; see https://docs.victoriametrics.com/victorialogs/querying/#live-tailing for details" , q )
}
q . Optimize ( )
refreshIntervalMsecs , err := httputils . GetDuration ( r , "refresh_interval" , 1000 )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
refreshInterval := time . Millisecond * time . Duration ( refreshIntervalMsecs )
ctxWithCancel , cancel := context . WithCancel ( ctx )
tp := newTailProcessor ( cancel )
ticker := time . NewTicker ( refreshInterval )
defer ticker . Stop ( )
end := time . Now ( ) . UnixNano ( )
doneCh := ctxWithCancel . Done ( )
2024-06-27 13:05:57 +00:00
flusher , ok := w . ( http . Flusher )
if ! ok {
logger . Panicf ( "BUG: it is expected that http.ResponseWriter (%T) supports http.Flusher interface" , w )
}
2024-06-27 12:18:42 +00:00
for {
start := end - tailOffsetNsecs
end = time . Now ( ) . UnixNano ( )
qCopy := q . Clone ( )
qCopy . AddTimeFilter ( start , end )
if err := vlstorage . RunQuery ( ctxWithCancel , tenantIDs , qCopy , tp . writeBlock ) ; err != nil {
httpserver . Errorf ( w , r , "cannot execute tail query [%s]: %s" , q , err )
return
}
resultRows , err := tp . getTailRows ( )
if err != nil {
httpserver . Errorf ( w , r , "cannot get tail results for query [%q]: %s" , q , err )
return
}
2024-06-27 13:05:57 +00:00
if len ( resultRows ) > 0 {
WriteJSONRows ( w , resultRows )
flusher . Flush ( )
}
2024-06-27 12:18:42 +00:00
select {
case <- doneCh :
return
case <- ticker . C :
}
}
}
var liveTailRequests = metrics . NewCounter ( ` vl_live_tailing_requests ` )
const tailOffsetNsecs = 5e9
type logRow struct {
timestamp int64
fields [ ] logstorage . Field
}
func sortLogRows ( rows [ ] logRow ) {
2024-06-28 17:14:29 +00:00
sort . SliceStable ( rows , func ( i , j int ) bool {
2024-06-27 12:18:42 +00:00
return rows [ i ] . timestamp < rows [ j ] . timestamp
} )
}
type tailProcessor struct {
cancel func ( )
mu sync . Mutex
perStreamRows map [ string ] [ ] logRow
lastTimestamps map [ string ] int64
err error
}
func newTailProcessor ( cancel func ( ) ) * tailProcessor {
return & tailProcessor {
cancel : cancel ,
perStreamRows : make ( map [ string ] [ ] logRow ) ,
lastTimestamps : make ( map [ string ] int64 ) ,
}
}
func ( tp * tailProcessor ) writeBlock ( _ uint , timestamps [ ] int64 , columns [ ] logstorage . BlockColumn ) {
if len ( timestamps ) == 0 {
return
}
tp . mu . Lock ( )
defer tp . mu . Unlock ( )
if tp . err != nil {
return
}
2024-06-27 13:05:57 +00:00
// Make sure columns contain _time field, since it is needed for proper tail work.
2024-06-27 12:18:42 +00:00
hasTime := false
for _ , c := range columns {
if c . Name == "_time" {
hasTime = true
2024-06-27 13:05:57 +00:00
break
2024-06-27 12:18:42 +00:00
}
}
if ! hasTime {
tp . err = fmt . Errorf ( "missing _time field" )
tp . cancel ( )
return
}
// Copy block rows to tp.perStreamRows
for i , timestamp := range timestamps {
streamID := ""
fields := make ( [ ] logstorage . Field , len ( columns ) )
for j , c := range columns {
name := strings . Clone ( c . Name )
value := strings . Clone ( c . Values [ i ] )
fields [ j ] = logstorage . Field {
Name : name ,
Value : value ,
}
if name == "_stream_id" {
streamID = value
}
}
2024-06-27 13:05:57 +00:00
2024-06-27 12:18:42 +00:00
tp . perStreamRows [ streamID ] = append ( tp . perStreamRows [ streamID ] , logRow {
timestamp : timestamp ,
fields : fields ,
} )
}
}
func ( tp * tailProcessor ) getTailRows ( ) ( [ ] [ ] logstorage . Field , error ) {
if tp . err != nil {
return nil , tp . err
}
var resultRows [ ] logRow
for streamID , rows := range tp . perStreamRows {
sortLogRows ( rows )
lastTimestamp , ok := tp . lastTimestamps [ streamID ]
if ok {
// Skip already written rows
2024-06-27 13:05:57 +00:00
for len ( rows ) > 0 && rows [ 0 ] . timestamp <= lastTimestamp {
rows = rows [ 1 : ]
2024-06-27 12:18:42 +00:00
}
}
2024-06-27 13:05:57 +00:00
if len ( rows ) > 0 {
resultRows = append ( resultRows , rows ... )
tp . lastTimestamps [ streamID ] = rows [ len ( rows ) - 1 ] . timestamp
}
2024-06-27 12:18:42 +00:00
}
clear ( tp . perStreamRows )
sortLogRows ( resultRows )
tailRows := make ( [ ] [ ] logstorage . Field , len ( resultRows ) )
for i , row := range resultRows {
tailRows [ i ] = row . fields
}
return tailRows , nil
}
2024-09-06 22:41:44 +00:00
// ProcessStatsQueryRangeRequest handles /select/logsql/stats_query_range request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-log-range-stats
func ProcessStatsQueryRangeRequest ( ctx context . Context , w http . ResponseWriter , r * http . Request ) {
q , tenantIDs , err := parseCommonArgs ( r )
if err != nil {
httpserver . SendPrometheusError ( w , r , err )
return
}
// Obtain step
stepStr := r . FormValue ( "step" )
if stepStr == "" {
stepStr = "1d"
}
step , err := promutils . ParseDuration ( stepStr )
if err != nil {
err = fmt . Errorf ( "cannot parse 'step' arg: %s" , err )
httpserver . SendPrometheusError ( w , r , err )
return
}
if step <= 0 {
err := fmt . Errorf ( "'step' must be bigger than zero" )
httpserver . SendPrometheusError ( w , r , err )
return
}
// Obtain `by(...)` fields from the last `| stats` pipe in q.
// Add `_time:step` to the `by(...)` list.
byFields , ok := q . GetStatsByFields ( int64 ( step ) )
if ! ok {
err := fmt . Errorf ( "the query must end with '| stats ...'; got [%s]" , q )
httpserver . SendPrometheusError ( w , r , err )
return
}
q . Optimize ( )
m := make ( map [ string ] * statsSeries )
var mLock sync . Mutex
writeBlock := func ( _ uint , timestamps [ ] int64 , columns [ ] logstorage . BlockColumn ) {
clonedColumnNames := make ( [ ] string , len ( columns ) )
for i , c := range columns {
clonedColumnNames [ i ] = strings . Clone ( c . Name )
}
for i := range timestamps {
timestamp := q . GetTimestamp ( )
labels := make ( [ ] logstorage . Field , 0 , len ( byFields ) )
for j , c := range columns {
if c . Name == "_time" {
nsec , ok := logstorage . TryParseTimestampRFC3339Nano ( c . Values [ i ] )
if ok {
timestamp = nsec
continue
}
}
if slices . Contains ( byFields , c . Name ) {
labels = append ( labels , logstorage . Field {
Name : clonedColumnNames [ j ] ,
Value : strings . Clone ( c . Values [ i ] ) ,
} )
}
}
var dst [ ] byte
for j , c := range columns {
if ! slices . Contains ( byFields , c . Name ) {
name := clonedColumnNames [ j ]
dst = dst [ : 0 ]
dst = append ( dst , name ... )
dst = logstorage . MarshalFieldsToJSON ( dst , labels )
key := string ( dst )
p := statsPoint {
Timestamp : timestamp ,
Value : strings . Clone ( c . Values [ i ] ) ,
}
mLock . Lock ( )
ss := m [ key ]
if ss == nil {
ss = & statsSeries {
key : key ,
Name : name ,
Labels : labels ,
}
m [ key ] = ss
}
ss . Points = append ( ss . Points , p )
mLock . Unlock ( )
}
}
}
}
if err := vlstorage . RunQuery ( ctx , tenantIDs , q , writeBlock ) ; err != nil {
err = fmt . Errorf ( "cannot execute query [%s]: %s" , q , err )
httpserver . SendPrometheusError ( w , r , err )
return
}
// Sort the collected stats by time
rows := make ( [ ] * statsSeries , 0 , len ( m ) )
for _ , ss := range m {
points := ss . Points
sort . Slice ( points , func ( i , j int ) bool {
return points [ i ] . Timestamp < points [ j ] . Timestamp
} )
rows = append ( rows , ss )
}
sort . Slice ( rows , func ( i , j int ) bool {
return rows [ i ] . key < rows [ j ] . key
} )
w . Header ( ) . Set ( "Content-Type" , "application/json" )
WriteStatsQueryRangeResponse ( w , rows )
}
type statsSeries struct {
key string
Name string
Labels [ ] logstorage . Field
Points [ ] statsPoint
}
type statsPoint struct {
Timestamp int64
Value string
}
2024-09-06 21:00:55 +00:00
// ProcessStatsQueryRequest handles /select/logsql/stats_query request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-log-stats
func ProcessStatsQueryRequest ( ctx context . Context , w http . ResponseWriter , r * http . Request ) {
q , tenantIDs , err := parseCommonArgs ( r )
if err != nil {
httpserver . SendPrometheusError ( w , r , err )
return
}
2024-09-06 22:41:44 +00:00
// Obtain `by(...)` fields from the last `| stats` pipe in q.
byFields , ok := q . GetStatsByFields ( 0 )
2024-09-06 21:00:55 +00:00
if ! ok {
err := fmt . Errorf ( "the query must end with '| stats ...'; got [%s]" , q )
httpserver . SendPrometheusError ( w , r , err )
return
}
q . Optimize ( )
var rows [ ] statsRow
var rowsLock sync . Mutex
timestamp := q . GetTimestamp ( )
writeBlock := func ( _ uint , timestamps [ ] int64 , columns [ ] logstorage . BlockColumn ) {
clonedColumnNames := make ( [ ] string , len ( columns ) )
for i , c := range columns {
clonedColumnNames [ i ] = strings . Clone ( c . Name )
}
for i := range timestamps {
labels := make ( [ ] logstorage . Field , 0 , len ( byFields ) )
for j , c := range columns {
if slices . Contains ( byFields , c . Name ) {
labels = append ( labels , logstorage . Field {
Name : clonedColumnNames [ j ] ,
Value : strings . Clone ( c . Values [ i ] ) ,
} )
}
}
for j , c := range columns {
if ! slices . Contains ( byFields , c . Name ) {
r := statsRow {
Name : clonedColumnNames [ j ] ,
Labels : labels ,
Timestamp : timestamp ,
Value : strings . Clone ( c . Values [ i ] ) ,
}
2024-09-06 22:41:44 +00:00
2024-09-06 21:00:55 +00:00
rowsLock . Lock ( )
rows = append ( rows , r )
rowsLock . Unlock ( )
}
}
}
}
if err := vlstorage . RunQuery ( ctx , tenantIDs , q , writeBlock ) ; err != nil {
err = fmt . Errorf ( "cannot execute query [%s]: %s" , q , err )
httpserver . SendPrometheusError ( w , r , err )
return
}
w . Header ( ) . Set ( "Content-Type" , "application/json" )
WriteStatsQueryResponse ( w , rows )
}
type statsRow struct {
Name string
Labels [ ] logstorage . Field
Timestamp int64
Value string
}
2024-05-20 02:08:30 +00:00
// ProcessQueryRequest handles /select/logsql/query request.
//
2024-09-06 21:00:55 +00:00
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-logs
2024-05-20 02:08:30 +00:00
func ProcessQueryRequest ( ctx context . Context , w http . ResponseWriter , r * http . Request ) {
q , tenantIDs , err := parseCommonArgs ( r )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
// Parse limit query arg
limit , err := httputils . GetInt ( r , "limit" )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
2024-06-03 22:59:25 +00:00
bw := getBufferedWriter ( w )
defer func ( ) {
bw . FlushIgnoreErrors ( )
putBufferedWriter ( bw )
} ( )
w . Header ( ) . Set ( "Content-Type" , "application/stream+json" )
2024-05-20 02:08:30 +00:00
if limit > 0 {
2024-06-03 22:59:25 +00:00
if q . CanReturnLastNResults ( ) {
rows , err := getLastNQueryResults ( ctx , tenantIDs , q , limit )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
bb := blockResultPool . Get ( )
b := bb . B
for i := range rows {
b = logstorage . MarshalFieldsToJSON ( b [ : 0 ] , rows [ i ] . fields )
b = append ( b , '\n' )
bw . WriteIgnoreErrors ( b )
}
bb . B = b
blockResultPool . Put ( bb )
return
}
2024-05-20 02:08:30 +00:00
q . AddPipeLimit ( uint64 ( limit ) )
}
2024-06-11 15:50:32 +00:00
q . Optimize ( )
2024-05-12 14:33:29 +00:00
writeBlock := func ( _ uint , timestamps [ ] int64 , columns [ ] logstorage . BlockColumn ) {
2024-05-20 02:08:30 +00:00
if len ( columns ) == 0 || len ( columns [ 0 ] . Values ) == 0 {
2024-02-18 21:01:34 +00:00
return
2023-06-20 05:55:12 +00:00
}
2024-02-18 21:01:34 +00:00
2023-06-20 05:55:12 +00:00
bb := blockResultPool . Get ( )
2024-05-12 14:33:29 +00:00
for i := range timestamps {
WriteJSONRow ( bb , columns , i )
2023-06-20 05:55:12 +00:00
}
2024-05-14 01:05:03 +00:00
bw . WriteIgnoreErrors ( bb . B )
2024-02-18 21:01:34 +00:00
blockResultPool . Put ( bb )
2024-05-12 14:33:29 +00:00
}
2024-06-03 22:59:25 +00:00
if err := vlstorage . RunQuery ( ctx , tenantIDs , q , writeBlock ) ; err != nil {
httpserver . Errorf ( w , r , "cannot execute query [%s]: %s" , q , err )
2024-06-27 12:18:42 +00:00
return
2024-06-03 22:59:25 +00:00
}
}
2024-05-12 14:33:29 +00:00
2024-06-03 22:59:25 +00:00
var blockResultPool bytesutil . ByteBufferPool
2024-05-12 14:33:29 +00:00
2024-06-03 22:59:25 +00:00
type row struct {
timestamp int64
fields [ ] logstorage . Field
}
func getLastNQueryResults ( ctx context . Context , tenantIDs [ ] logstorage . TenantID , q * logstorage . Query , limit int ) ( [ ] row , error ) {
2024-06-05 01:18:12 +00:00
limitUpper := 2 * limit
q . AddPipeLimit ( uint64 ( limitUpper ) )
2024-06-03 22:59:25 +00:00
q . Optimize ( )
2024-06-05 01:18:12 +00:00
rows , err := getQueryResultsWithLimit ( ctx , tenantIDs , q , limitUpper )
2024-05-12 14:33:29 +00:00
if err != nil {
2024-06-03 22:59:25 +00:00
return nil , err
}
2024-06-05 01:18:12 +00:00
if len ( rows ) < limitUpper {
// Fast path - the requested time range contains up to limitUpper rows.
rows = getLastNRows ( rows , limit )
2024-06-03 22:59:25 +00:00
return rows , nil
}
2024-06-05 01:18:12 +00:00
// Slow path - search for the time range containing up to limitUpper rows.
2024-06-03 22:59:25 +00:00
start , end := q . GetFilterTimeRange ( )
d := end / 2 - start / 2
start += d
qOrig := q
for {
q = qOrig . Clone ( )
q . AddTimeFilter ( start , end )
2024-06-05 01:18:12 +00:00
rows , err := getQueryResultsWithLimit ( ctx , tenantIDs , q , limitUpper )
2024-06-03 22:59:25 +00:00
if err != nil {
return nil , err
}
2024-06-05 01:18:12 +00:00
if len ( rows ) >= limit && len ( rows ) < limitUpper || d == 0 {
rows = getLastNRows ( rows , limit )
2024-06-03 22:59:25 +00:00
return rows , nil
}
lastBit := d & 1
d /= 2
if len ( rows ) > limit {
start += d
} else {
start -= d + lastBit
}
2024-05-12 14:33:29 +00:00
}
2023-06-20 05:55:12 +00:00
}
2024-06-05 01:18:12 +00:00
func getLastNRows ( rows [ ] row , limit int ) [ ] row {
2024-06-03 22:59:25 +00:00
sort . Slice ( rows , func ( i , j int ) bool {
return rows [ i ] . timestamp < rows [ j ] . timestamp
} )
2024-06-05 01:18:12 +00:00
if len ( rows ) > limit {
rows = rows [ len ( rows ) - limit : ]
}
return rows
2024-06-03 22:59:25 +00:00
}
func getQueryResultsWithLimit ( ctx context . Context , tenantIDs [ ] logstorage . TenantID , q * logstorage . Query , limit int ) ( [ ] row , error ) {
ctxWithCancel , cancel := context . WithCancel ( ctx )
defer cancel ( )
var rows [ ] row
var rowsLock sync . Mutex
writeBlock := func ( _ uint , timestamps [ ] int64 , columns [ ] logstorage . BlockColumn ) {
rowsLock . Lock ( )
defer rowsLock . Unlock ( )
for i , timestamp := range timestamps {
fields := make ( [ ] logstorage . Field , len ( columns ) )
for j := range columns {
f := & fields [ j ]
f . Name = strings . Clone ( columns [ j ] . Name )
f . Value = strings . Clone ( columns [ j ] . Values [ i ] )
}
rows = append ( rows , row {
timestamp : timestamp ,
fields : fields ,
} )
}
if len ( rows ) >= limit {
cancel ( )
}
}
if err := vlstorage . RunQuery ( ctxWithCancel , tenantIDs , q , writeBlock ) ; err != nil {
return nil , err
}
return rows , nil
}
2024-05-15 02:55:44 +00:00
2024-05-20 02:08:30 +00:00
func parseCommonArgs ( r * http . Request ) ( * logstorage . Query , [ ] logstorage . TenantID , error ) {
// Extract tenantID
tenantID , err := logstorage . GetTenantIDFromRequest ( r )
if err != nil {
return nil , nil , fmt . Errorf ( "cannot obtain tenanID: %w" , err )
}
tenantIDs := [ ] logstorage . TenantID { tenantID }
2024-09-06 21:00:55 +00:00
// Parse optional time arg
timestamp , okTime , err := getTimeNsec ( r , "time" )
if err != nil {
return nil , nil , err
}
if ! okTime {
// If time arg is missing, then evaluate query at the current timestamp
timestamp = time . Now ( ) . UnixNano ( )
}
// decrease timestamp by one nanosecond in order to avoid capturing logs belonging
// to the first nanosecond at the next period of time (month, week, day, hour, etc.)
timestamp --
2024-05-20 02:08:30 +00:00
// Parse query
qStr := r . FormValue ( "query" )
2024-09-06 21:00:55 +00:00
q , err := logstorage . ParseQueryAtTimestamp ( qStr , timestamp )
2024-05-20 02:08:30 +00:00
if err != nil {
return nil , nil , fmt . Errorf ( "cannot parse query [%s]: %s" , qStr , err )
}
// Parse optional start and end args
start , okStart , err := getTimeNsec ( r , "start" )
if err != nil {
return nil , nil , err
}
end , okEnd , err := getTimeNsec ( r , "end" )
if err != nil {
return nil , nil , err
}
if okStart || okEnd {
if ! okStart {
start = math . MinInt64
}
if ! okEnd {
end = math . MaxInt64
}
q . AddTimeFilter ( start , end )
}
return q , tenantIDs , nil
}
2024-05-15 02:55:44 +00:00
func getTimeNsec ( r * http . Request , argName string ) ( int64 , bool , error ) {
s := r . FormValue ( argName )
if s == "" {
return 0 , false , nil
}
2024-06-03 22:59:25 +00:00
currentTimestamp := time . Now ( ) . UnixNano ( )
nsecs , err := promutils . ParseTimeAt ( s , currentTimestamp )
2024-05-15 02:55:44 +00:00
if err != nil {
return 0 , false , fmt . Errorf ( "cannot parse %s=%s: %w" , argName , s , err )
}
2024-06-03 22:59:25 +00:00
return nsecs , true , nil
2024-05-15 02:55:44 +00:00
}