mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/logstorage: follow-up for af831a6c906158f371f1b6810706fa0a54b78386
Sync the code between top and sort pipes regarding the code related to rank.
This commit is contained in:
parent
8faee6b446
commit
7a623c225f
6 changed files with 59 additions and 48 deletions
|
@ -2169,6 +2169,7 @@ It is recommended limiting the number of logs before sorting with the following
|
||||||
|
|
||||||
See also:
|
See also:
|
||||||
|
|
||||||
|
- [`top` pipe](#top-pipe)
|
||||||
- [`stats` pipe](#stats-pipe)
|
- [`stats` pipe](#stats-pipe)
|
||||||
- [`limit` pipe](#limit-pipe)
|
- [`limit` pipe](#limit-pipe)
|
||||||
- [`offset` pipe](#offset-pipe)
|
- [`offset` pipe](#offset-pipe)
|
||||||
|
@ -2403,19 +2404,20 @@ _time:5m | fields ip | top
|
||||||
It is possible to set `rank` field per each returned entry for `top` pipe by adding `with rank`. For example, the following query sets the `rank` field per each returned `ip`:
|
It is possible to set `rank` field per each returned entry for `top` pipe by adding `with rank`. For example, the following query sets the `rank` field per each returned `ip`:
|
||||||
|
|
||||||
```logsql
|
```logsql
|
||||||
_time:5m | top 10 by (ip) with rank
|
_time:5m | top 10 by (ip) rank
|
||||||
```
|
```
|
||||||
|
|
||||||
The `rank` field can have other name. For example, the following query uses the `position` field name instead of `rank` field name in the output:
|
The `rank` field can have other name. For example, the following query uses the `position` field name instead of `rank` field name in the output:
|
||||||
|
|
||||||
```logsql
|
```logsql
|
||||||
_time:5m | top 10 by (ip) with rank as position
|
_time:5m | top 10 by (ip) rank as position
|
||||||
```
|
```
|
||||||
|
|
||||||
See also:
|
See also:
|
||||||
|
|
||||||
- [`uniq` pipe](#uniq-pipe)
|
- [`uniq` pipe](#uniq-pipe)
|
||||||
- [`stats` pipe](#stats-pipe)
|
- [`stats` pipe](#stats-pipe)
|
||||||
|
- [`sort` pipe](#sort-pipe)
|
||||||
|
|
||||||
### uniq pipe
|
### uniq pipe
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,7 @@ type pipeSort struct {
|
||||||
limit uint64
|
limit uint64
|
||||||
|
|
||||||
// The name of the field to store the row rank.
|
// The name of the field to store the row rank.
|
||||||
rankName string
|
rankFieldName string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *pipeSort) String() string {
|
func (ps *pipeSort) String() string {
|
||||||
|
@ -57,8 +57,8 @@ func (ps *pipeSort) String() string {
|
||||||
if ps.limit > 0 {
|
if ps.limit > 0 {
|
||||||
s += fmt.Sprintf(" limit %d", ps.limit)
|
s += fmt.Sprintf(" limit %d", ps.limit)
|
||||||
}
|
}
|
||||||
if ps.rankName != "" {
|
if ps.rankFieldName != "" {
|
||||||
s += " rank as " + quoteTokenIfNeeded(ps.rankName)
|
s += rankFieldNameString(ps.rankFieldName)
|
||||||
}
|
}
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
@ -72,10 +72,10 @@ func (ps *pipeSort) updateNeededFields(neededFields, unneededFields fieldsSet) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if ps.rankName != "" {
|
if ps.rankFieldName != "" {
|
||||||
neededFields.remove(ps.rankName)
|
neededFields.remove(ps.rankFieldName)
|
||||||
if neededFields.contains("*") {
|
if neededFields.contains("*") {
|
||||||
unneededFields.add(ps.rankName)
|
unneededFields.add(ps.rankFieldName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -533,9 +533,9 @@ type pipeSortWriteContext struct {
|
||||||
|
|
||||||
func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) {
|
func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) {
|
||||||
ps := shard.ps
|
ps := shard.ps
|
||||||
rankName := ps.rankName
|
rankFieldName := ps.rankFieldName
|
||||||
rankFields := 0
|
rankFields := 0
|
||||||
if rankName != "" {
|
if rankFieldName != "" {
|
||||||
rankFields = 1
|
rankFields = 1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -567,8 +567,8 @@ func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) {
|
||||||
wctx.flush()
|
wctx.flush()
|
||||||
|
|
||||||
rcs = wctx.rcs[:0]
|
rcs = wctx.rcs[:0]
|
||||||
if rankName != "" {
|
if rankFieldName != "" {
|
||||||
rcs = appendResultColumnWithName(rcs, rankName)
|
rcs = appendResultColumnWithName(rcs, rankFieldName)
|
||||||
}
|
}
|
||||||
for _, bf := range byFields {
|
for _, bf := range byFields {
|
||||||
rcs = appendResultColumnWithName(rcs, bf.name)
|
rcs = appendResultColumnWithName(rcs, bf.name)
|
||||||
|
@ -579,7 +579,7 @@ func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) {
|
||||||
wctx.rcs = rcs
|
wctx.rcs = rcs
|
||||||
}
|
}
|
||||||
|
|
||||||
if rankName != "" {
|
if rankFieldName != "" {
|
||||||
bufLen := len(wctx.buf)
|
bufLen := len(wctx.buf)
|
||||||
wctx.buf = marshalUint64String(wctx.buf, wctx.rowsWritten)
|
wctx.buf = marshalUint64String(wctx.buf, wctx.rowsWritten)
|
||||||
v := bytesutil.ToUnsafeString(wctx.buf[bufLen:])
|
v := bytesutil.ToUnsafeString(wctx.buf[bufLen:])
|
||||||
|
@ -798,15 +798,11 @@ func parsePipeSort(lex *lexer) (*pipeSort, error) {
|
||||||
}
|
}
|
||||||
ps.limit = n
|
ps.limit = n
|
||||||
case lex.isKeyword("rank"):
|
case lex.isKeyword("rank"):
|
||||||
lex.nextToken()
|
rankFieldName, err := parseRankFieldName(lex)
|
||||||
if lex.isKeyword("as") {
|
|
||||||
lex.nextToken()
|
|
||||||
}
|
|
||||||
rankName, err := getCompoundToken(lex)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot read rank field name: %s", err)
|
return nil, fmt.Errorf("cannot read rank field name: %s", err)
|
||||||
}
|
}
|
||||||
ps.rankName = rankName
|
ps.rankFieldName = rankFieldName
|
||||||
default:
|
default:
|
||||||
return &ps, nil
|
return &ps, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,6 +11,7 @@ func TestParsePipeSortSuccess(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
f(`sort`)
|
f(`sort`)
|
||||||
|
f(`sort rank`)
|
||||||
f(`sort rank as foo`)
|
f(`sort rank as foo`)
|
||||||
f(`sort by (x)`)
|
f(`sort by (x)`)
|
||||||
f(`sort by (x) limit 10`)
|
f(`sort by (x) limit 10`)
|
||||||
|
@ -26,7 +27,6 @@ func TestParsePipeSortFailure(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
f(`sort a`)
|
f(`sort a`)
|
||||||
f(`sort rank`)
|
|
||||||
f(`sort by`)
|
f(`sort by`)
|
||||||
f(`sort by(x) foo`)
|
f(`sort by(x) foo`)
|
||||||
f(`sort by(x) limit`)
|
f(`sort by(x) limit`)
|
||||||
|
|
|
@ -440,9 +440,9 @@ type pipeTopkWriteContext struct {
|
||||||
|
|
||||||
func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bool {
|
func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bool {
|
||||||
ps := shard.ps
|
ps := shard.ps
|
||||||
rankName := ps.rankName
|
rankFieldName := ps.rankFieldName
|
||||||
rankFields := 0
|
rankFields := 0
|
||||||
if rankName != "" {
|
if rankFieldName != "" {
|
||||||
rankFields = 1
|
rankFields = 1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -476,8 +476,8 @@ func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bo
|
||||||
wctx.flush()
|
wctx.flush()
|
||||||
|
|
||||||
rcs = wctx.rcs[:0]
|
rcs = wctx.rcs[:0]
|
||||||
if rankName != "" {
|
if rankFieldName != "" {
|
||||||
rcs = appendResultColumnWithName(rcs, rankName)
|
rcs = appendResultColumnWithName(rcs, rankFieldName)
|
||||||
}
|
}
|
||||||
for _, bf := range byFields {
|
for _, bf := range byFields {
|
||||||
rcs = appendResultColumnWithName(rcs, bf.name)
|
rcs = appendResultColumnWithName(rcs, bf.name)
|
||||||
|
@ -488,7 +488,7 @@ func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bo
|
||||||
wctx.rcs = rcs
|
wctx.rcs = rcs
|
||||||
}
|
}
|
||||||
|
|
||||||
if rankName != "" {
|
if rankFieldName != "" {
|
||||||
bufLen := len(wctx.buf)
|
bufLen := len(wctx.buf)
|
||||||
wctx.buf = marshalUint64String(wctx.buf, wctx.rowsWritten)
|
wctx.buf = marshalUint64String(wctx.buf, wctx.rowsWritten)
|
||||||
v := bytesutil.ToUnsafeString(wctx.buf[bufLen:])
|
v := bytesutil.ToUnsafeString(wctx.buf[bufLen:])
|
||||||
|
|
|
@ -51,10 +51,7 @@ func (pt *pipeTop) String() string {
|
||||||
s += " by (" + fieldNamesString(pt.byFields) + ")"
|
s += " by (" + fieldNamesString(pt.byFields) + ")"
|
||||||
}
|
}
|
||||||
if pt.rankFieldName != "" {
|
if pt.rankFieldName != "" {
|
||||||
s += " with rank"
|
s += rankFieldNameString(pt.rankFieldName)
|
||||||
if pt.rankFieldName != "rank" {
|
|
||||||
s += " as " + pt.rankFieldName
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
@ -685,26 +682,43 @@ func parsePipeTop(lex *lexer) (*pipeTop, error) {
|
||||||
hitsFieldName: hitsFieldName,
|
hitsFieldName: hitsFieldName,
|
||||||
}
|
}
|
||||||
|
|
||||||
if !lex.isKeyword("with") {
|
if lex.isKeyword("rank") {
|
||||||
|
rankFieldName, err := parseRankFieldName(lex)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot parse rank field name in [%s]: %w", pt, err)
|
||||||
|
}
|
||||||
|
pt.rankFieldName = rankFieldName
|
||||||
|
}
|
||||||
return pt, nil
|
return pt, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
lex.nextToken()
|
func parseRankFieldName(lex *lexer) (string, error) {
|
||||||
if !lex.isKeyword("rank") {
|
if !lex.isKeyword("rank") {
|
||||||
return nil, fmt.Errorf("missing 'rank' word after 'with' in [%s]", pt)
|
return "", fmt.Errorf("unexpected token: %q; want 'rank'", lex.token)
|
||||||
}
|
}
|
||||||
lex.nextToken()
|
lex.nextToken()
|
||||||
pt.rankFieldName = "rank"
|
|
||||||
|
rankFieldName := "rank"
|
||||||
if lex.isKeyword("as") {
|
if lex.isKeyword("as") {
|
||||||
lex.nextToken()
|
lex.nextToken()
|
||||||
if lex.isKeyword("", "|", ")", "(") {
|
if lex.isKeyword("", "|", ")", "(") {
|
||||||
return nil, fmt.Errorf("missing rank name in [%s as]", pt)
|
return "", fmt.Errorf("missing rank name")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !lex.isKeyword("", "|", ")") {
|
if !lex.isKeyword("", "|", ")", "limit") {
|
||||||
pt.rankFieldName = lex.token
|
s, err := getCompoundToken(lex)
|
||||||
lex.nextToken()
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
rankFieldName = s
|
||||||
|
}
|
||||||
|
return rankFieldName, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return pt, nil
|
func rankFieldNameString(rankFieldName string) string {
|
||||||
|
s := " rank"
|
||||||
|
if rankFieldName != "rank" {
|
||||||
|
s += " as " + rankFieldName
|
||||||
|
}
|
||||||
|
return s
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,15 +11,15 @@ func TestParsePipeTopSuccess(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
f(`top`)
|
f(`top`)
|
||||||
f(`top with rank`)
|
f(`top rank`)
|
||||||
f(`top 5`)
|
f(`top 5`)
|
||||||
f(`top 5 with rank as foo`)
|
f(`top 5 rank as foo`)
|
||||||
f(`top by (x)`)
|
f(`top by (x)`)
|
||||||
f(`top 5 by (x)`)
|
f(`top 5 by (x)`)
|
||||||
f(`top by (x, y)`)
|
f(`top by (x, y)`)
|
||||||
f(`top 5 by (x, y)`)
|
f(`top 5 by (x, y)`)
|
||||||
f(`top by (x) with rank`)
|
f(`top by (x) rank`)
|
||||||
f(`top by (x) with rank as foo`)
|
f(`top by (x) rank as foo`)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestParsePipeTopFailure(t *testing.T) {
|
func TestParsePipeTopFailure(t *testing.T) {
|
||||||
|
@ -34,8 +34,7 @@ func TestParsePipeTopFailure(t *testing.T) {
|
||||||
f(`top 5foo`)
|
f(`top 5foo`)
|
||||||
f(`top foo`)
|
f(`top foo`)
|
||||||
f(`top by`)
|
f(`top by`)
|
||||||
f(`top (x) with`)
|
f(`top (x) rank a b`)
|
||||||
f(`top (x) with rank as`)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPipeTop(t *testing.T) {
|
func TestPipeTop(t *testing.T) {
|
||||||
|
@ -72,7 +71,7 @@ func TestPipeTop(t *testing.T) {
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
f("top with rank", [][]Field{
|
f("top rank", [][]Field{
|
||||||
{
|
{
|
||||||
{"a", `2`},
|
{"a", `2`},
|
||||||
{"b", `3`},
|
{"b", `3`},
|
||||||
|
@ -170,7 +169,7 @@ func TestPipeTop(t *testing.T) {
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
f("top by (b) with rank as x", [][]Field{
|
f("top by (b) rank as x", [][]Field{
|
||||||
{
|
{
|
||||||
{"a", `2`},
|
{"a", `2`},
|
||||||
{"b", `3`},
|
{"b", `3`},
|
||||||
|
|
Loading…
Reference in a new issue