mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/logstorage: consistently sort stream contexts belonging to different streams by the minimum time seen in the matching logs
This should simplify debugging of stream_context output, since it remains stable over repeated requests.
This commit is contained in:
parent
b49d1ea809
commit
9367a9a6a2
3 changed files with 40 additions and 6 deletions
|
@ -15,6 +15,9 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
|
||||||
|
|
||||||
## tip
|
## tip
|
||||||
|
|
||||||
|
* BUGFIX: consistently return matching log streams sorted by time from [`stream_context` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe). Previously log streams could be returned in arbitrary order with every request. This could complicate using `stream_context` pipe.
|
||||||
|
* BUGFIX: add missing `_msg="---"` delimiter between stream contexts belonging to different [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields). This should simplify investigating `stream_context` output for multiple matching log streams.
|
||||||
|
|
||||||
## [v0.30.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.30.0-victorialogs)
|
## [v0.30.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.30.0-victorialogs)
|
||||||
|
|
||||||
Released at 2024-09-27
|
Released at 2024-09-27
|
||||||
|
|
|
@ -530,7 +530,10 @@ func (pcp *pipeStreamContextProcessor) flush() error {
|
||||||
pcp: pcp,
|
pcp: pcp,
|
||||||
}
|
}
|
||||||
|
|
||||||
for streamID, rows := range m {
|
// write output contexts in the ascending order of rows
|
||||||
|
streamIDs := getStreamIDsSortedByMinRowTimestamp(m)
|
||||||
|
for _, streamID := range streamIDs {
|
||||||
|
rows := m[streamID]
|
||||||
streamRowss, err := pcp.getStreamRowss(streamID, rows, stateSizeBudget)
|
streamRowss, err := pcp.getStreamRowss(streamID, rows, stateSizeBudget)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -557,6 +560,34 @@ func (pcp *pipeStreamContextProcessor) flush() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getStreamIDsSortedByMinRowTimestamp(m map[string][]streamContextRow) []string {
|
||||||
|
type streamTimestamp struct {
|
||||||
|
streamID string
|
||||||
|
timestamp int64
|
||||||
|
}
|
||||||
|
streamTimestamps := make([]streamTimestamp, 0, len(m))
|
||||||
|
for streamID, rows := range m {
|
||||||
|
minTimestamp := rows[0].timestamp
|
||||||
|
for _, r := range rows[1:] {
|
||||||
|
if r.timestamp < minTimestamp {
|
||||||
|
minTimestamp = r.timestamp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
streamTimestamps = append(streamTimestamps, streamTimestamp{
|
||||||
|
streamID: streamID,
|
||||||
|
timestamp: minTimestamp,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
sort.Slice(streamTimestamps, func(i, j int) bool {
|
||||||
|
return streamTimestamps[i].timestamp < streamTimestamps[j].timestamp
|
||||||
|
})
|
||||||
|
streamIDs := make([]string, len(streamTimestamps))
|
||||||
|
for i := range streamIDs {
|
||||||
|
streamIDs[i] = streamTimestamps[i].streamID
|
||||||
|
}
|
||||||
|
return streamIDs
|
||||||
|
}
|
||||||
|
|
||||||
func newDelimiterRowFields(r *streamContextRow, streamID string) []Field {
|
func newDelimiterRowFields(r *streamContextRow, streamID string) []Field {
|
||||||
return []Field{
|
return []Field{
|
||||||
{
|
{
|
||||||
|
|
|
@ -662,7 +662,7 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
| stream_context before 0
|
| stream_context before 0
|
||||||
| stats count() rows`, [][]Field{
|
| stats count() rows`, [][]Field{
|
||||||
{
|
{
|
||||||
{"rows", "33"},
|
{"rows", "66"},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -671,7 +671,7 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
| stream_context before 0 after 0
|
| stream_context before 0 after 0
|
||||||
| stats count() rows`, [][]Field{
|
| stats count() rows`, [][]Field{
|
||||||
{
|
{
|
||||||
{"rows", "33"},
|
{"rows", "66"},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -680,7 +680,7 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
| stream_context before 1
|
| stream_context before 1
|
||||||
| stats count() rows`, [][]Field{
|
| stats count() rows`, [][]Field{
|
||||||
{
|
{
|
||||||
{"rows", "66"},
|
{"rows", "99"},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -689,7 +689,7 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
| stream_context after 1
|
| stream_context after 1
|
||||||
| stats count() rows`, [][]Field{
|
| stats count() rows`, [][]Field{
|
||||||
{
|
{
|
||||||
{"rows", "66"},
|
{"rows", "99"},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -698,7 +698,7 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
| stream_context before 1 after 1
|
| stream_context before 1 after 1
|
||||||
| stats count() rows`, [][]Field{
|
| stats count() rows`, [][]Field{
|
||||||
{
|
{
|
||||||
{"rows", "99"},
|
{"rows", "132"},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in a new issue