VictoriaMetrics/app/vlselect/logsql/logsql.go

491 lines
12 KiB
Go
Raw Normal View History

package logsql
import (
2024-04-26 21:47:50 +00:00
"context"
2024-05-15 02:42:03 +00:00
"fmt"
"math"
"net/http"
2024-05-19 23:45:29 +00:00
"sort"
"strings"
2024-05-18 23:32:09 +00:00
"sync"
2024-05-15 02:42:03 +00:00
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
2024-05-15 02:42:03 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
)
2024-05-18 23:32:09 +00:00
// ProcessHitsRequest handles /select/logsql/hits request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-hits-stats
func ProcessHitsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
q, tenantIDs, err := parseCommonArgs(r)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
// Obtain step
stepStr := r.FormValue("step")
if stepStr == "" {
stepStr = "1d"
}
step, err := promutils.ParseDuration(stepStr)
if err != nil {
httpserver.Errorf(w, r, "cannot parse 'step' arg: %s", err)
return
}
if step <= 0 {
httpserver.Errorf(w, r, "'step' must be bigger than zero")
}
// Obtain offset
offsetStr := r.FormValue("offset")
if offsetStr == "" {
offsetStr = "0s"
}
offset, err := promutils.ParseDuration(offsetStr)
if err != nil {
httpserver.Errorf(w, r, "cannot parse 'offset' arg: %s", err)
return
}
2024-05-19 17:16:28 +00:00
// Obtain field entries
fields := r.Form["field"]
2024-05-19 23:45:29 +00:00
// Prepare the query
2024-05-19 17:16:28 +00:00
q.AddCountByTimePipe(int64(step), int64(offset), fields)
2024-05-18 23:32:09 +00:00
q.Optimize()
2024-05-19 23:45:29 +00:00
var mLock sync.Mutex
m := make(map[string]*hitsSeries)
2024-05-18 23:32:09 +00:00
writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) {
if len(columns) == 0 || len(columns[0].Values) == 0 {
return
}
2024-05-19 23:45:29 +00:00
timestampValues := columns[0].Values
hitsValues := columns[len(columns)-1].Values
columns = columns[1 : len(columns)-1]
2024-05-18 23:32:09 +00:00
bb := blockResultPool.Get()
for i := range timestamps {
2024-05-19 23:45:29 +00:00
timestampStr := strings.Clone(timestampValues[i])
hitsStr := strings.Clone(hitsValues[i])
bb.Reset()
2024-05-25 09:59:47 +00:00
WriteFieldsForHits(bb, columns, i)
2024-05-19 23:45:29 +00:00
mLock.Lock()
hs, ok := m[string(bb.B)]
if !ok {
k := string(bb.B)
hs = &hitsSeries{}
m[k] = hs
}
hs.timestamps = append(hs.timestamps, timestampStr)
hs.values = append(hs.values, hitsStr)
mLock.Unlock()
2024-05-18 23:32:09 +00:00
}
blockResultPool.Put(bb)
}
2024-05-19 23:45:29 +00:00
// Execute the query
if err := vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock); err != nil {
httpserver.Errorf(w, r, "cannot execute query [%s]: %s", q, err)
return
}
2024-05-18 23:32:09 +00:00
// Write response
w.Header().Set("Content-Type", "application/json")
2024-05-19 23:45:29 +00:00
WriteHitsSeries(w, m)
}
2024-05-18 23:32:09 +00:00
2024-05-19 23:45:29 +00:00
type hitsSeries struct {
timestamps []string
values []string
}
func (hs *hitsSeries) sort() {
sort.Sort(hs)
}
func (hs *hitsSeries) Len() int {
return len(hs.timestamps)
}
func (hs *hitsSeries) Swap(i, j int) {
hs.timestamps[i], hs.timestamps[j] = hs.timestamps[j], hs.timestamps[i]
hs.values[i], hs.values[j] = hs.values[j], hs.values[i]
}
func (hs *hitsSeries) Less(i, j int) bool {
return hs.timestamps[i] < hs.timestamps[j]
2024-05-18 23:32:09 +00:00
}
2024-05-18 21:07:51 +00:00
// ProcessFieldNamesRequest handles /select/logsql/field_names request.
2024-05-18 23:32:09 +00:00
//
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-field-names
2024-05-18 21:07:51 +00:00
func ProcessFieldNamesRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
q, tenantIDs, err := parseCommonArgs(r)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
// Obtain field names for the given query
q.Optimize()
fieldNames, err := vlstorage.GetFieldNames(ctx, tenantIDs, q)
if err != nil {
2024-05-18 21:58:01 +00:00
httpserver.Errorf(w, r, "cannot obtain field names: %s", err)
2024-05-18 21:07:51 +00:00
return
}
// Write results
w.Header().Set("Content-Type", "application/json")
2024-05-24 01:03:12 +00:00
WriteValuesWithHitsJSON(w, fieldNames)
2024-05-18 21:07:51 +00:00
}
2024-05-18 17:40:23 +00:00
// ProcessFieldValuesRequest handles /select/logsql/field_values request.
2024-05-18 23:32:09 +00:00
//
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-field-values
2024-05-18 17:40:23 +00:00
func ProcessFieldValuesRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
q, tenantIDs, err := parseCommonArgs(r)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
2024-05-18 17:40:23 +00:00
// Parse fieldName query arg
2024-05-21 19:18:05 +00:00
fieldName := r.FormValue("field")
2024-05-18 17:40:23 +00:00
if fieldName == "" {
2024-05-21 19:18:05 +00:00
httpserver.Errorf(w, r, "missing 'field' query arg")
return
}
2024-05-18 17:40:23 +00:00
// Parse limit query arg
limit, err := httputils.GetInt(r, "limit")
2024-05-15 02:42:03 +00:00
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
2024-05-18 17:40:23 +00:00
if limit < 0 {
limit = 0
}
// Obtain unique values for the given field
q.Optimize()
2024-05-18 21:07:51 +00:00
values, err := vlstorage.GetFieldValues(ctx, tenantIDs, q, fieldName, uint64(limit))
2024-05-15 02:42:03 +00:00
if err != nil {
2024-05-18 17:40:23 +00:00
httpserver.Errorf(w, r, "cannot obtain values for field %q: %s", fieldName, err)
2024-05-15 02:42:03 +00:00
return
}
2024-05-18 17:40:23 +00:00
2024-05-21 19:18:05 +00:00
// Write results
w.Header().Set("Content-Type", "application/json")
2024-05-24 01:03:12 +00:00
WriteValuesWithHitsJSON(w, values)
2024-05-21 19:18:05 +00:00
}
2024-05-25 09:59:47 +00:00
// ProcessStreamFieldNamesRequest processes /select/logsql/stream_field_names request.
2024-05-21 20:47:57 +00:00
//
2024-05-25 09:59:47 +00:00
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-stream-field-names
func ProcessStreamFieldNamesRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
2024-05-21 20:47:57 +00:00
q, tenantIDs, err := parseCommonArgs(r)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
2024-05-25 09:59:47 +00:00
// Obtain stream field names for the given query
2024-05-21 20:47:57 +00:00
q.Optimize()
2024-05-25 09:59:47 +00:00
names, err := vlstorage.GetStreamFieldNames(ctx, tenantIDs, q)
2024-05-21 20:47:57 +00:00
if err != nil {
2024-05-25 09:59:47 +00:00
httpserver.Errorf(w, r, "cannot obtain stream field names: %s", err)
2024-05-21 20:47:57 +00:00
}
// Write results
w.Header().Set("Content-Type", "application/json")
2024-05-24 01:03:12 +00:00
WriteValuesWithHitsJSON(w, names)
2024-05-21 20:47:57 +00:00
}
2024-05-25 09:59:47 +00:00
// ProcessStreamFieldValuesRequest processes /select/logsql/stream_field_values request.
2024-05-21 20:47:57 +00:00
//
2024-05-25 09:59:47 +00:00
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-stream-field-values
func ProcessStreamFieldValuesRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
2024-05-21 20:47:57 +00:00
q, tenantIDs, err := parseCommonArgs(r)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
2024-05-25 09:59:47 +00:00
// Parse fieldName query arg
fieldName := r.FormValue("field")
if fieldName == "" {
httpserver.Errorf(w, r, "missing 'field' query arg")
2024-05-21 20:47:57 +00:00
return
}
// Parse limit query arg
limit, err := httputils.GetInt(r, "limit")
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
if limit < 0 {
limit = 0
}
2024-05-25 09:59:47 +00:00
// Obtain stream field values for the given query and the given fieldName
2024-05-21 20:47:57 +00:00
q.Optimize()
2024-05-25 09:59:47 +00:00
values, err := vlstorage.GetStreamFieldValues(ctx, tenantIDs, q, fieldName, uint64(limit))
2024-05-21 20:47:57 +00:00
if err != nil {
2024-05-25 09:59:47 +00:00
httpserver.Errorf(w, r, "cannot obtain stream field values: %s", err)
2024-05-21 20:47:57 +00:00
}
// Write results
w.Header().Set("Content-Type", "application/json")
2024-05-24 01:03:12 +00:00
WriteValuesWithHitsJSON(w, values)
2024-05-21 20:47:57 +00:00
}
2024-05-21 19:18:05 +00:00
// ProcessStreamsRequest processes /select/logsql/streams request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-streams
func ProcessStreamsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
q, tenantIDs, err := parseCommonArgs(r)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
// Parse limit query arg
limit, err := httputils.GetInt(r, "limit")
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
if limit < 0 {
limit = 0
}
// Obtain streams for the given query
q.Optimize()
2024-05-21 20:47:57 +00:00
streams, err := vlstorage.GetStreams(ctx, tenantIDs, q, uint64(limit))
2024-05-21 19:18:05 +00:00
if err != nil {
httpserver.Errorf(w, r, "cannot obtain streams: %s", err)
2024-05-18 21:07:51 +00:00
}
2024-05-18 17:40:23 +00:00
// Write results
w.Header().Set("Content-Type", "application/json")
2024-05-24 01:03:12 +00:00
WriteValuesWithHitsJSON(w, streams)
2024-05-18 17:40:23 +00:00
}
// ProcessQueryRequest handles /select/logsql/query request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#http-api
func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
q, tenantIDs, err := parseCommonArgs(r)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
2024-05-15 02:42:03 +00:00
}
// Parse limit query arg
limit, err := httputils.GetInt(r, "limit")
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
2024-06-03 14:58:47 +00:00
bw := getBufferedWriter(w)
defer func() {
bw.FlushIgnoreErrors()
putBufferedWriter(bw)
}()
w.Header().Set("Content-Type", "application/stream+json")
2024-05-14 01:04:11 +00:00
if limit > 0 {
2024-06-03 14:58:47 +00:00
if q.CanReturnLastNResults() {
rows, err := getLastNQueryResults(ctx, tenantIDs, q, limit)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
bb := blockResultPool.Get()
b := bb.B
for i := range rows {
b = logstorage.MarshalFieldsToJSON(b[:0], rows[i].fields)
b = append(b, '\n')
bw.WriteIgnoreErrors(b)
}
bb.B = b
blockResultPool.Put(bb)
return
}
2024-05-14 01:04:11 +00:00
q.AddPipeLimit(uint64(limit))
2024-06-03 14:58:47 +00:00
q.Optimize()
2024-05-14 01:04:11 +00:00
}
2024-04-26 21:47:50 +00:00
writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) {
2024-05-18 23:32:09 +00:00
if len(columns) == 0 || len(columns[0].Values) == 0 {
return
}
bb := blockResultPool.Get()
2024-04-26 21:47:50 +00:00
for i := range timestamps {
WriteJSONRow(bb, columns, i)
}
2024-05-14 01:04:11 +00:00
bw.WriteIgnoreErrors(bb.B)
blockResultPool.Put(bb)
2024-04-26 21:47:50 +00:00
}
2024-06-03 14:58:47 +00:00
if err := vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock); err != nil {
httpserver.Errorf(w, r, "cannot execute query [%s]: %s", q, err)
}
}
2024-04-26 21:47:50 +00:00
2024-06-03 14:58:47 +00:00
var blockResultPool bytesutil.ByteBufferPool
2024-04-27 20:08:03 +00:00
2024-06-03 14:58:47 +00:00
type row struct {
timestamp int64
fields []logstorage.Field
}
func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, limit int) ([]row, error) {
q.AddPipeLimit(uint64(limit + 1))
q.Optimize()
rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limit+1)
2024-04-27 20:08:03 +00:00
if err != nil {
2024-06-03 14:58:47 +00:00
return nil, err
}
if len(rows) <= limit {
// Fast path - the requested time range contains up to limit rows
sortRowsByTime(rows)
return rows, nil
}
// Slow path - search for the time range with the requested limit rows.
start, end := q.GetFilterTimeRange()
d := (end - start) / 2
start += d
qOrig := q
for {
q = qOrig.Clone()
q.AddTimeFilter(start, end)
rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limit+1)
if err != nil {
return nil, err
}
if len(rows) == limit || d == 0 {
sortRowsByTime(rows)
if len(rows) > limit {
rows = rows[:limit]
}
return rows, nil
}
lastBit := d & 1
d /= 2
if len(rows) > limit {
start += d
} else {
start -= d + lastBit
}
2024-04-27 20:08:03 +00:00
}
}
2024-06-03 14:58:47 +00:00
func sortRowsByTime(rows []row) {
sort.Slice(rows, func(i, j int) bool {
return rows[i].timestamp < rows[j].timestamp
})
}
func getQueryResultsWithLimit(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, limit int) ([]row, error) {
ctxWithCancel, cancel := context.WithCancel(ctx)
defer cancel()
var rows []row
var rowsLock sync.Mutex
writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) {
rowsLock.Lock()
defer rowsLock.Unlock()
for i, timestamp := range timestamps {
fields := make([]logstorage.Field, len(columns))
for j := range columns {
f := &fields[j]
f.Name = strings.Clone(columns[j].Name)
f.Value = strings.Clone(columns[j].Values[i])
}
rows = append(rows, row{
timestamp: timestamp,
fields: fields,
})
}
if len(rows) >= limit {
cancel()
}
}
if err := vlstorage.RunQuery(ctxWithCancel, tenantIDs, q, writeBlock); err != nil {
return nil, err
}
return rows, nil
}
2024-05-15 02:42:03 +00:00
2024-05-18 17:40:23 +00:00
func parseCommonArgs(r *http.Request) (*logstorage.Query, []logstorage.TenantID, error) {
// Extract tenantID
tenantID, err := logstorage.GetTenantIDFromRequest(r)
if err != nil {
return nil, nil, fmt.Errorf("cannot obtain tenanID: %w", err)
}
tenantIDs := []logstorage.TenantID{tenantID}
// Parse query
qStr := r.FormValue("query")
q, err := logstorage.ParseQuery(qStr)
if err != nil {
return nil, nil, fmt.Errorf("cannot parse query [%s]: %s", qStr, err)
}
// Parse optional start and end args
start, okStart, err := getTimeNsec(r, "start")
if err != nil {
return nil, nil, err
}
end, okEnd, err := getTimeNsec(r, "end")
if err != nil {
return nil, nil, err
}
if okStart || okEnd {
if !okStart {
start = math.MinInt64
}
if !okEnd {
end = math.MaxInt64
}
q.AddTimeFilter(start, end)
}
return q, tenantIDs, nil
}
2024-05-15 02:42:03 +00:00
func getTimeNsec(r *http.Request, argName string) (int64, bool, error) {
s := r.FormValue(argName)
if s == "" {
return 0, false, nil
}
2024-06-03 14:58:47 +00:00
currentTimestamp := time.Now().UnixNano()
nsecs, err := promutils.ParseTimeAt(s, currentTimestamp)
2024-05-15 02:42:03 +00:00
if err != nil {
return 0, false, fmt.Errorf("cannot parse %s=%s: %w", argName, s, err)
}
2024-06-03 14:58:47 +00:00
return nsecs, true, nil
2024-05-15 02:42:03 +00:00
}