mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vlselect/logsql: clone the query with the current timestamp when performing live tailing requests in the loop
Previously the original timestamp was used in the copied query, so _time:duration filters were applied to the original time range: (timestamp-duration ... timestamp]. This resulted in stopped live tailing, since new logs have timestamps bigger than the original time range. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7028
This commit is contained in:
parent
c89a7a0b62
commit
255d1d4e13
4 changed files with 13 additions and 7 deletions
|
@ -417,13 +417,17 @@ func ProcessLiveTailRequest(ctx context.Context, w http.ResponseWriter, r *http.
|
|||
if !ok {
|
||||
logger.Panicf("BUG: it is expected that http.ResponseWriter (%T) supports http.Flusher interface", w)
|
||||
}
|
||||
qOrig := q
|
||||
for {
|
||||
start := end - tailOffsetNsecs
|
||||
end = time.Now().UnixNano()
|
||||
|
||||
qCopy := q.Clone()
|
||||
qCopy.AddTimeFilter(start, end)
|
||||
if err := vlstorage.RunQuery(ctxWithCancel, tenantIDs, qCopy, tp.writeBlock); err != nil {
|
||||
q = qOrig.Clone(end)
|
||||
q.AddTimeFilter(start, end)
|
||||
// q.Optimize() call is needed for converting '*' into filterNoop.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6785#issuecomment-2358547733
|
||||
q.Optimize()
|
||||
if err := vlstorage.RunQuery(ctxWithCancel, tenantIDs, q, tp.writeBlock); err != nil {
|
||||
httpserver.Errorf(w, r, "cannot execute tail query [%s]: %s", q, err)
|
||||
return
|
||||
}
|
||||
|
@ -862,7 +866,8 @@ func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID,
|
|||
|
||||
qOrig := q
|
||||
for {
|
||||
q = qOrig.Clone()
|
||||
timestamp := qOrig.GetTimestamp()
|
||||
q = qOrig.Clone(timestamp)
|
||||
q.AddTimeFilter(start, end)
|
||||
// q.Optimize() call is needed for converting '*' into filterNoop.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6785#issuecomment-2358547733
|
||||
|
|
|
@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
|
|||
* FEATURE: improve performance of analytical queries, which do not need reading the `_time` field. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7070).
|
||||
* FEATURE: add [`blocks_count` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#blocks_count-pipe), which can be used for counting the number of matching blocks for the given query. For example, `_time:5m | blocks_count` returns the number of blocks with logs for the last 5 minutes. This pipe can be useful for debugging purposes.
|
||||
|
||||
* BUGFIX: properly returns logs from [`/select/logsql/tail` endpoint](https://docs.victoriametrics.com/victorialogs/querying/#live-tailing) if the query contains [`_time:some_duration` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) like `_time:5m`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7028). The bug has been introduced in [v0.29.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.29.0-victorialogs).
|
||||
* BUGFIX: properly return logs without [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) field when `*` query is passed to [`/select/logsql/query` endpoint](https://docs.victoriametrics.com/victorialogs/querying/#querying-logs) together with positive `limit` arg. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6785). Thanks to @jiekun for itentifying the root cause of the issue.
|
||||
|
||||
## [v0.29.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.29.0-victorialogs)
|
||||
|
|
|
@ -336,9 +336,8 @@ func (q *Query) AddCountByTimePipe(step, off int64, fields []string) {
|
|||
}
|
||||
|
||||
// Clone returns a copy of q.
|
||||
func (q *Query) Clone() *Query {
|
||||
func (q *Query) Clone(timestamp int64) *Query {
|
||||
qStr := q.String()
|
||||
timestamp := q.GetTimestamp()
|
||||
qCopy, err := ParseQueryAtTimestamp(qStr, timestamp)
|
||||
if err != nil {
|
||||
logger.Panicf("BUG: cannot parse %q: %s", qStr, err)
|
||||
|
|
|
@ -2000,7 +2000,8 @@ func TestQueryClone(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("cannot parse [%s]: %s", qStr, err)
|
||||
}
|
||||
qCopy := q.Clone()
|
||||
timestamp := q.GetTimestamp()
|
||||
qCopy := q.Clone(timestamp)
|
||||
qCopyStr := qCopy.String()
|
||||
if qStr != qCopyStr {
|
||||
t.Fatalf("unexpected cloned query\ngot\n%s\nwant\n%s", qCopyStr, qStr)
|
||||
|
|
Loading…
Reference in a new issue