lib/logstorage: return the expected hits results from uniq pipe when the number of unique values reaches the specified limit

Previously `uniq` pipe could return zero `hits` if the number of found unique values equals the specified limit.
This wasn't expected in most cases.
This commit is contained in:
Aliaksandr Valialkin 2024-09-29 10:50:25 +02:00
parent 55eb321f77
commit b52862badf
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
6 changed files with 38 additions and 10 deletions

View file

@ -135,7 +135,7 @@ type pipeUniqProcessorShardNopad struct {
// //
// It returns false if the block cannot be written because of the exceeded limit. // It returns false if the block cannot be written because of the exceeded limit.
func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool { func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool {
if limit := shard.pu.limit; limit > 0 && uint64(len(shard.m)) >= limit { if limit := shard.pu.limit; limit > 0 && uint64(len(shard.m)) > limit {
return false return false
} }
@ -301,7 +301,7 @@ func (pup *pipeUniqProcessor) flush() error {
// There is little sense in returning partial hits when the limit on the number of unique entries is reached. // There is little sense in returning partial hits when the limit on the number of unique entries is reached.
// It is better from UX experience is to return zero hits instead. // It is better from UX experience is to return zero hits instead.
resetHits := pup.pu.limit > 0 && uint64(len(m)) >= pup.pu.limit resetHits := pup.pu.limit > 0 && uint64(len(m)) > pup.pu.limit
// write result // write result
wctx := &pipeUniqWriteContext{ wctx := &pipeUniqWriteContext{

View file

@ -114,13 +114,41 @@ func TestPipeUniq(t *testing.T) {
{ {
{"a", "2"}, {"a", "2"},
{"b", "3"}, {"b", "3"},
{"hits", "0"}, {"hits", "2"},
}, },
{ {
{"a", `2`}, {"a", `2`},
{"b", `54`}, {"b", `54`},
{"c", "d"}, {"c", "d"},
{"hits", "0"}, {"hits", "1"},
},
})
f("uniq hits limit 3", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"a", "2"},
{"b", "3"},
{"hits", "2"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
{"hits", "1"},
}, },
}) })

View file

@ -355,7 +355,7 @@ func (sup *statsCountUniqProcessor) updateState(v []byte) int {
func (sup *statsCountUniqProcessor) limitReached() bool { func (sup *statsCountUniqProcessor) limitReached() bool {
limit := sup.su.limit limit := sup.su.limit
return limit > 0 && uint64(len(sup.m)) >= limit return limit > 0 && uint64(len(sup.m)) > limit
} }
func parseStatsCountUniq(lex *lexer) (*statsCountUniq, error) { func parseStatsCountUniq(lex *lexer) (*statsCountUniq, error) {

View file

@ -215,7 +215,7 @@ func (sup *statsUniqValuesProcessor) updateState(v string) int {
func (sup *statsUniqValuesProcessor) limitReached() bool { func (sup *statsUniqValuesProcessor) limitReached() bool {
limit := sup.su.limit limit := sup.su.limit
return limit > 0 && uint64(len(sup.m)) >= limit return limit > 0 && uint64(len(sup.m)) > limit
} }
func marshalJSONArray(items []string) string { func marshalJSONArray(items []string) string {

View file

@ -184,7 +184,7 @@ func (svp *statsValuesProcessor) finalizeStats() string {
func (svp *statsValuesProcessor) limitReached() bool { func (svp *statsValuesProcessor) limitReached() bool {
limit := svp.sv.limit limit := svp.sv.limit
return limit > 0 && uint64(len(svp.values)) >= limit return limit > 0 && uint64(len(svp.values)) > limit
} }
func parseStatsValues(lex *lexer) (*statsValues, error) { func parseStatsValues(lex *lexer) (*statsValues, error) {

View file

@ -384,9 +384,9 @@ func TestStorageRunQuery(t *testing.T) {
} }
resultsExpected := []ValueWithHits{ resultsExpected := []ValueWithHits{
{`{instance="host-0:234",job="foobar"}`, 0}, {`{instance="host-0:234",job="foobar"}`, 385},
{`{instance="host-1:234",job="foobar"}`, 0}, {`{instance="host-1:234",job="foobar"}`, 385},
{`{instance="host-2:234",job="foobar"}`, 0}, {`{instance="host-2:234",job="foobar"}`, 385},
} }
if !reflect.DeepEqual(results, resultsExpected) { if !reflect.DeepEqual(results, resultsExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected) t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)