This commit is contained in:
Aliaksandr Valialkin 2024-05-09 02:52:28 +02:00
parent dbc809dbc2
commit 48ed6abe75
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
19 changed files with 585 additions and 288 deletions

View file

@ -0,0 +1,80 @@
package logstorage
import (
"sort"
"strings"
)
type fieldsSet map[string]struct{}
func newFieldsSet() fieldsSet {
return fieldsSet(map[string]struct{}{})
}
func (fs fieldsSet) reset() {
clear(fs)
}
func (fs fieldsSet) String() string {
a := fs.getAll()
return "[" + strings.Join(a, ",") + "]"
}
func (fs fieldsSet) clone() fieldsSet {
fsNew := newFieldsSet()
for _, f := range fs.getAll() {
fsNew.add(f)
}
return fsNew
}
func (fs fieldsSet) getAll() []string {
a := make([]string, 0, len(fs))
for f := range fs {
a = append(a, f)
}
sort.Strings(a)
return a
}
func (fs fieldsSet) contains(field string) bool {
_, ok := fs[field]
if !ok {
_, ok = fs["*"]
}
return ok
}
func (fs fieldsSet) removeAll(fields []string) {
for _, f := range fields {
fs.remove(f)
}
}
func (fs fieldsSet) remove(field string) {
if field == "*" {
fs.reset()
return
}
if !fs.contains("*") {
delete(fs, field)
}
}
func (fs fieldsSet) addAll(fields []string) {
for _, f := range fields {
fs.add(f)
}
}
func (fs fieldsSet) add(field string) {
if fs.contains("*") {
return
}
if field == "*" {
fs.reset()
fs["*"] = struct{}{}
return
}
fs[field] = struct{}{}
}

View file

@ -0,0 +1,94 @@
package logstorage
import (
"reflect"
"testing"
)
func TestFieldsSet(t *testing.T) {
fs := newFieldsSet()
// verify add, remove and contains
if fs.contains("*") {
t.Fatalf("fs mustn't contain *")
}
if fs.contains("foo") {
t.Fatalf("fs musn't contain foo")
}
fs.add("foo")
fs.add("bar")
s := fs.String()
if s != "[bar,foo]" {
t.Fatalf("unexpected String() result; got %s; want %s", s, "[bar,foo]")
}
if !fs.contains("foo") {
t.Fatalf("fs must contain foo")
}
if !fs.contains("bar") {
t.Fatalf("fs must contain bar")
}
if fs.contains("baz") {
t.Fatalf("fs musn't contain baz")
}
if fs.contains("*") {
t.Fatalf("fs mustn't contain *")
}
fs.remove("foo")
if fs.contains("foo") {
t.Fatalf("fs mustn't contain foo")
}
fs.remove("bar")
if fs.contains("bar") {
t.Fatalf("fs mustn't contain bar")
}
// verify *
fs.add("*")
if !fs.contains("*") {
t.Fatalf("fs must contain *")
}
if !fs.contains("foo") || !fs.contains("bar") || !fs.contains("baz") {
t.Fatalf("fs must contain anything")
}
fs.remove("foo")
if !fs.contains("foo") {
t.Fatalf("fs must contain anything")
}
fs.remove("*")
if fs.contains("foo") || fs.contains("bar") || fs.contains("baz") {
t.Fatalf("fs must be empty")
}
// verify addAll, getAll, removeAll
fs.addAll([]string{"foo", "bar"})
if !fs.contains("foo") || !fs.contains("bar") {
t.Fatalf("fs must contain foo and bar")
}
a := fs.getAll()
if !reflect.DeepEqual(a, []string{"bar", "foo"}) {
t.Fatalf("unexpected result from getAll(); got %q; want %q", a, []string{"bar", "foo"})
}
fs.removeAll([]string{"bar", "baz"})
if fs.contains("bar") || fs.contains("baz") {
t.Fatalf("fs mustn't contain bar and baz")
}
if !fs.contains("foo") {
t.Fatalf("fs must contain foo")
}
// verify clone
fs.addAll([]string{"foo", "bar", "baz"})
fsStr := fs.String()
fsCopy := fs.clone()
fsCopyStr := fsCopy.String()
if fsStr != fsCopyStr {
t.Fatalf("unexpected clone result; got %s; want %s", fsCopyStr, fsStr)
}
fsCopy.remove("foo")
if fsCopy.contains("foo") {
t.Fatalf("fsCopy mustn't contain foo")
}
if !fs.contains("foo") {
t.Fatalf("fs must contain foo")
}
}

View file

@ -202,114 +202,16 @@ func (q *Query) String() string {
} }
func (q *Query) getNeededColumns() []string { func (q *Query) getNeededColumns() []string {
input := []string{"*"} neededFields := newFieldsSet()
dropFields := make(map[string]struct{}) neededFields.add("*")
unneededFields := newFieldsSet()
pipes := q.pipes pipes := q.pipes
for i := len(pipes) - 1; i >= 0; i-- { for i := len(pipes) - 1; i >= 0; i-- {
neededFields, mapping := pipes[i].getNeededFields() pipes[i].updateNeededFields(neededFields, unneededFields)
neededFields = normalizeFields(neededFields)
referredFields := make(map[string]int)
for _, inFields := range mapping {
for _, f := range inFields {
referredFields[f]++
}
} }
for k := range dropFields { return neededFields.getAll()
inFields := mapping[k]
for _, f := range inFields {
referredFields[f]--
}
}
for k, v := range referredFields {
if v == 0 {
dropFields[k] = struct{}{}
}
}
dropFieldsNext := make(map[string]struct{})
for k := range mapping {
if k != "*" && referredFields[k] == 0 {
dropFieldsNext[k] = struct{}{}
}
}
for k := range dropFields {
if referredFields[k] == 0 {
dropFieldsNext[k] = struct{}{}
}
}
if len(dropFields) > 0 {
var neededFieldsDst []string
for _, f := range neededFields {
if _, ok := dropFields[f]; !ok {
neededFieldsDst = append(neededFieldsDst, f)
}
}
neededFields = neededFieldsDst
}
if len(neededFields) == 0 {
input = nil
}
// transform upper input fields to the current input fields according to the given mapping.
if len(input) == 0 || input[0] != "*" {
var dst []string
for _, f := range input {
if a, ok := mapping[f]; ok {
dst = append(dst, a...)
} else {
dst = append(dst, f)
}
}
if a, ok := mapping["*"]; ok {
dst = append(dst, a...)
}
input = normalizeFields(dst)
}
// intersect neededFields with input
if len(neededFields) == 0 || neededFields[0] != "*" {
clear(dropFields)
if len(input) > 0 && input[0] == "*" {
input = neededFields
continue
}
m := make(map[string]struct{})
for _, f := range input {
m[f] = struct{}{}
}
var dst []string
for _, f := range neededFields {
if _, ok := m[f]; ok {
dst = append(dst, f)
}
}
input = dst
} else {
dropFields = dropFieldsNext
}
}
return input
}
func normalizeFields(a []string) []string {
m := make(map[string]struct{}, len(a))
dst := make([]string, 0, len(a))
for _, s := range a {
if s == "*" {
return []string{"*"}
}
if _, ok := m[s]; ok {
continue
}
m[s] = struct{}{}
dst = append(dst, s)
}
return dst
} }
// ParseQuery parses s. // ParseQuery parses s.

View file

@ -3,7 +3,6 @@ package logstorage
import ( import (
"math" "math"
"reflect" "reflect"
"slices"
"testing" "testing"
"time" "time"
) )
@ -1255,25 +1254,3 @@ func TestParseQueryFailure(t *testing.T) {
f(`foo | sort by(baz,`) f(`foo | sort by(baz,`)
f(`foo | sort by(bar) foo`) f(`foo | sort by(bar) foo`)
} }
func TestNormalizeFields(t *testing.T) {
f := func(fields, normalizedExpected []string) {
t.Helper()
normalized := normalizeFields(fields)
if !slices.Equal(normalized, normalizedExpected) {
t.Fatalf("unexpected normalized fields for %q; got %q; want %q", fields, normalized, normalizedExpected)
}
}
f(nil, nil)
f([]string{"foo"}, []string{"foo"})
// duplicate fields
f([]string{"foo", "bar", "foo", "x"}, []string{"foo", "bar", "x"})
f([]string{"foo", "foo", "x", "x", "x"}, []string{"foo", "x"})
// star field
f([]string{"*"}, []string{"*"})
f([]string{"foo", "*", "bar"}, []string{"*"})
}

View file

@ -8,11 +8,8 @@ type pipe interface {
// String returns string representation of the pipe. // String returns string representation of the pipe.
String() string String() string
// getNeededFields must return the required input fields alongside the mapping from output fields to input fields for the given pipe. // updateNeededFields must update neededFields and unneededFields with fields it needs and not needs at the input.
// updateNeededFields(neededFields, unneededFields fieldsSet)
// It must return []string{"*"} if the set of input fields cannot be determined at the given pipe.
// It must return nil map if the pipe doesn't add new fields to the output.
getNeededFields() ([]string, map[string][]string)
// newPipeProcessor must return new pipeProcessor for the given ppBase. // newPipeProcessor must return new pipeProcessor for the given ppBase.
// //

View file

@ -31,12 +31,31 @@ func (pc *pipeCopy) String() string {
return "copy " + strings.Join(a, ", ") return "copy " + strings.Join(a, ", ")
} }
func (pc *pipeCopy) getNeededFields() ([]string, map[string][]string) { func (pc *pipeCopy) updateNeededFields(neededFields, unneededFields fieldsSet) {
m := make(map[string][]string, len(pc.srcFields)) m := make(map[string]int)
for i, dstField := range pc.dstFields { for i, dstField := range pc.dstFields {
m[dstField] = append(m[dstField], pc.srcFields[i]) if neededFields.contains(dstField) && !unneededFields.contains(dstField) {
m[pc.srcFields[i]]++
}
}
if neededFields.contains("*") {
// update only unneeded fields
unneededFields.addAll(pc.dstFields)
for i, srcField := range pc.srcFields {
if m[srcField] > 0 {
unneededFields.remove(pc.srcFields[i])
}
}
} else {
// update only needed fields and reset unneeded fields
neededFields.removeAll(pc.dstFields)
for i, srcField := range pc.srcFields {
if m[srcField] > 0 {
neededFields.add(pc.srcFields[i])
}
}
unneededFields.reset()
} }
return []string{"*"}, m
} }
func (pc *pipeCopy) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (pc *pipeCopy) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {

View file

@ -0,0 +1,82 @@
package logstorage
import (
"strings"
"testing"
)
func TestPipeCopyUpdateNeededFields(t *testing.T) {
f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper()
nfs := newTestFieldsSet(neededFields)
unfs := newTestFieldsSet(unneededFields)
lex := newLexer(s)
lex.nextToken()
p, err := parsePipeCopy(lex)
if err != nil {
t.Fatalf("cannot parse %s: %s", s, err)
}
p.updateNeededFields(nfs, unfs)
assertNeededFields(t, nfs, unfs, neededFieldsExpected, unneededFieldsExpected)
}
// all the needed fields
f("copy s1 d1, s2 d2", "*", "", "*", "d1,d2")
// all the needed fields, unneeded fields do not intersect with src and dst
f("copy s1 d1 ,s2 d2", "*", "f1,f2", "*", "d1,d2,f1,f2")
// all the needed fields, unneeded fields intersect with src
f("copy s1 d1 ,s2 d2", "*", "s1,f1,f2", "*", "d1,d2,f1,f2")
// all the needed fields, unneeded fields intersect with dst
f("copy s1 d1, s2 d2", "*", "d2,f1,f2", "*", "d1,d2,f1,f2")
// all the needed fields, unneeded fields intersect with src and dst
f("copy s1 d1, s2 d2", "*", "s1,d1,f1,f2", "*", "d1,d2,f1,f2,s1")
f("copy s1 d1, s2 d2", "*", "s1,d2,f1,f2", "*", "d1,d2,f1,f2")
// needed fields do not intersect with src and dst
f("copy s1 d1, s2 d2", "f1,f2", "", "f1,f2", "")
// needed fields intersect with src
f("copy s1 d1, s2 d2", "s1,f1,f2", "", "s1,f1,f2", "")
// needed fields intersect with dst
f("copy s1 d1, s2 d2", "d1,f1,f2", "", "f1,f2,s1", "")
// needed fields intersect with src and dst
f("copy s1 d1, s2 d2", "s1,d1,f1,f2", "", "s1,f1,f2", "")
f("copy s1 d1, s2 d2", "s1,d2,f1,f2", "", "s1,s2,f1,f2", "")
f("copy s1 d1, s2 d2", "s2,d1,f1,f2", "", "s1,s2,f1,f2", "")
}
func assertNeededFields(t *testing.T, nfs, unfs fieldsSet, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper()
nfsStr := nfs.String()
unfsStr := unfs.String()
nfsExpected := newTestFieldsSet(neededFieldsExpected)
unfsExpected := newTestFieldsSet(unneededFieldsExpected)
nfsExpectedStr := nfsExpected.String()
unfsExpectedStr := unfsExpected.String()
if nfsStr != nfsExpectedStr {
t.Fatalf("unexpected needed fields; got %s; want %s", nfsStr, nfsExpectedStr)
}
if unfsStr != unfsExpectedStr {
t.Fatalf("unexpected unneeded fields; got %s; want %s", unfsStr, unfsExpectedStr)
}
}
func newTestFieldsSet(fields string) fieldsSet {
fs := newFieldsSet()
if fields != "" {
fs.addAll(strings.Split(fields, ","))
}
return fs
}

View file

@ -22,12 +22,14 @@ func (pd *pipeDelete) String() string {
return "delete " + fieldNamesString(pd.fields) return "delete " + fieldNamesString(pd.fields)
} }
func (pd *pipeDelete) getNeededFields() ([]string, map[string][]string) { func (pd *pipeDelete) updateNeededFields(neededFields, unneededFields fieldsSet) {
m := make(map[string][]string, len(pd.fields)) if neededFields.contains("*") {
for _, f := range pd.fields { // update only unneeded fields
m[f] = nil unneededFields.addAll(pd.fields)
} else {
// update only needed fields
neededFields.removeAll(pd.fields)
} }
return []string{"*"}, m
} }
func (pd *pipeDelete) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (pd *pipeDelete) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {

View file

@ -0,0 +1,39 @@
package logstorage
import (
"testing"
)
func TestPipeDeleteUpdateNeededFields(t *testing.T) {
f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper()
nfs := newTestFieldsSet(neededFields)
unfs := newTestFieldsSet(unneededFields)
lex := newLexer(s)
lex.nextToken()
p, err := parsePipeDelete(lex)
if err != nil {
t.Fatalf("cannot parse %s: %s", s, err)
}
p.updateNeededFields(nfs, unfs)
assertNeededFields(t, nfs, unfs, neededFieldsExpected, unneededFieldsExpected)
}
// all the needed fields
f("del s1,s2", "*", "", "*", "s1,s2")
// all the needed fields, unneeded fields do not intersect with src
f("del s1,s2", "*", "f1,f2", "*", "s1,s2,f1,f2")
// all the needed fields, unneeded fields intersect with src
f("del s1,s2", "*", "s1,f1,f2", "*", "s1,s2,f1,f2")
// needed fields do not intersect with src
f("del s1,s2", "f1,f2", "", "f1,f2", "")
// needed fields intersect with src
f("del s1,s2", "s1,f1,f2", "", "f1,f2", "")
}

View file

@ -25,11 +25,28 @@ func (pf *pipeFields) String() string {
return "fields " + fieldNamesString(pf.fields) return "fields " + fieldNamesString(pf.fields)
} }
func (pf *pipeFields) getNeededFields() ([]string, map[string][]string) { func (pf *pipeFields) updateNeededFields(neededFields, unneededFields fieldsSet) {
if pf.containsStar { if pf.containsStar {
return []string{"*"}, nil return
} }
return pf.fields, nil if neededFields.contains("*") {
// subtract unneeded fields from pf.fields
neededFields.reset()
neededFields.addAll(pf.fields)
for _, f := range unneededFields.getAll() {
neededFields.remove(f)
}
} else {
// intersect needed fields with pf.fields
neededFieldsOrig := neededFields.clone()
neededFields.reset()
for _, f := range pf.fields {
if neededFieldsOrig.contains(f) {
neededFields.add(f)
}
}
}
unneededFields.reset()
} }
func (pf *pipeFields) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (pf *pipeFields) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {

View file

@ -0,0 +1,43 @@
package logstorage
import (
"testing"
)
func TestPipeFieldsUpdateNeededFields(t *testing.T) {
f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper()
nfs := newTestFieldsSet(neededFields)
unfs := newTestFieldsSet(unneededFields)
lex := newLexer(s)
lex.nextToken()
p, err := parsePipeFields(lex)
if err != nil {
t.Fatalf("cannot parse %s: %s", s, err)
}
p.updateNeededFields(nfs, unfs)
assertNeededFields(t, nfs, unfs, neededFieldsExpected, unneededFieldsExpected)
}
// all the needed fields
f("fields s1, s2", "*", "", "s1,s2", "")
f("fields *", "*", "", "*", "")
// all the needed fields, unneeded fields do not intersect with src
f("fields s1, s2", "*", "f1,f2", "s1,s2", "")
f("fields *", "*", "f1,f2", "*", "f1,f2")
// all the needed fields, unneeded fields intersect with src
f("fields s1, s2", "*", "s1,f1,f2", "s2", "")
f("fields *", "*", "s1,f1,f2", "*", "s1,f1,f2")
// needed fields do not intersect with src
f("fields s1, s2", "f1,f2", "", "", "")
// needed fields intersect with src
f("fields s1, s2", "s1,f1,f2", "", "s1", "")
f("fields *", "s1,f1,f2", "", "s1,f1,f2", "")
}

View file

@ -16,8 +16,7 @@ func (pl *pipeLimit) String() string {
return fmt.Sprintf("limit %d", pl.n) return fmt.Sprintf("limit %d", pl.n)
} }
func (pl *pipeLimit) getNeededFields() ([]string, map[string][]string) { func (pl *pipeLimit) updateNeededFields(_, _ fieldsSet) {
return []string{"*"}, nil
} }
func (pl *pipeLimit) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { func (pl *pipeLimit) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor {

View file

@ -16,8 +16,7 @@ func (po *pipeOffset) String() string {
return fmt.Sprintf("offset %d", po.n) return fmt.Sprintf("offset %d", po.n)
} }
func (po *pipeOffset) getNeededFields() ([]string, map[string][]string) { func (po *pipeOffset) updateNeededFields(_, _ fieldsSet) {
return []string{"*"}, nil
} }
func (po *pipeOffset) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (po *pipeOffset) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {

View file

@ -31,17 +31,33 @@ func (pr *pipeRename) String() string {
return "rename " + strings.Join(a, ", ") return "rename " + strings.Join(a, ", ")
} }
func (pr *pipeRename) getNeededFields() ([]string, map[string][]string) { func (pr *pipeRename) updateNeededFields(neededFields, unneededFields fieldsSet) {
m := make(map[string][]string, len(pr.srcFields)+len(pr.dstFields)) m := make(map[string]int)
for i, dstField := range pr.dstFields { for i, dstField := range pr.dstFields {
m[dstField] = append(m[dstField], pr.srcFields[i]) if neededFields.contains(dstField) && !unneededFields.contains(dstField) {
} m[pr.srcFields[i]]++
for _, srcField := range pr.srcFields {
if _, ok := m[srcField]; !ok {
m[srcField] = nil
} }
} }
return []string{"*"}, m if neededFields.contains("*") {
// update only unneeded fields
unneededFields.addAll(pr.dstFields)
for i, srcField := range pr.srcFields {
if m[srcField] > 0 {
unneededFields.remove(pr.srcFields[i])
}
}
} else {
// update only needed fields and reset unneeded fields
neededFields.removeAll(pr.dstFields)
for i, srcField := range pr.srcFields {
if m[srcField] > 0 {
neededFields.add(pr.srcFields[i])
} else {
neededFields.remove(pr.srcFields[i])
}
}
unneededFields.reset()
}
} }
func (pr *pipeRename) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (pr *pipeRename) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {

View file

@ -0,0 +1,55 @@
package logstorage
import (
"testing"
)
func TestPipeRenameUpdateNeededFields(t *testing.T) {
f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper()
nfs := newTestFieldsSet(neededFields)
unfs := newTestFieldsSet(unneededFields)
lex := newLexer(s)
lex.nextToken()
p, err := parsePipeRename(lex)
if err != nil {
t.Fatalf("cannot parse %s: %s", s, err)
}
p.updateNeededFields(nfs, unfs)
assertNeededFields(t, nfs, unfs, neededFieldsExpected, unneededFieldsExpected)
}
// all the needed fields
f("rename s1 d1, s2 d2", "*", "", "*", "d1,d2")
// all the needed fields, unneeded fields do not intersect with src and dst
f("rename s1 d1, s2 d2", "*", "f1,f2", "*", "d1,d2,f1,f2")
// all the needed fields, unneeded fields intersect with src
// mv s1 d1, s2 d2 | rm s1, f1, f2 (d1, d2, f1, f2)
f("rename s1 d1, s2 d2", "*", "s1,f1,f2", "*", "d1,d2,f1,f2")
// all the needed fields, unneeded fields intersect with dst
f("rename s1 d1, s2 d2", "*", "d2,f1,f2", "*", "d1,d2,f1,f2")
// all the needed fields, unneeded fields intersect with src and dst
f("rename s1 d1, s2 d2", "*", "s1,d1,f1,f2", "*", "d1,d2,f1,f2,s1")
f("rename s1 d1, s2 d2", "*", "s1,d2,f1,f2", "*", "d1,d2,f1,f2")
// needed fields do not intersect with src and dst
f("rename s1 d1, s2 d2", "f1,f2", "", "f1,f2", "")
// needed fields intersect with src
f("rename s1 d1, s2 d2", "s1,f1,f2", "", "f1,f2", "")
// needed fields intersect with dst
f("rename s1 d1, s2 d2", "d1,f1,f2", "", "f1,f2,s1", "")
// needed fields intersect with src and dst
f("rename s1 d1, s2 d2", "s1,d1,f1,f2", "", "s1,f1,f2", "")
f("rename s1 d1, s2 d2", "s1,d2,f1,f2", "", "s2,f1,f2", "")
f("rename s1 d1, s2 d2", "s2,d1,f1,f2", "", "s1,f1,f2", "")
}

View file

@ -41,23 +41,16 @@ func (ps *pipeSort) String() string {
return s return s
} }
func (ps *pipeSort) getNeededFields() ([]string, map[string][]string) { func (ps *pipeSort) updateNeededFields(neededFields, unneededFields fieldsSet) {
byFields := ps.byFields if len(ps.byFields) == 0 {
neededFields.add("*")
if len(byFields) == 0 { unneededFields.reset()
return []string{"*"}, map[string][]string{ } else {
"*": {"*"}, for _, bf := range ps.byFields {
neededFields.add(bf.name)
unneededFields.remove(bf.name)
} }
} }
fields := make([]string, len(byFields))
for i, bf := range byFields {
fields[i] = bf.name
}
m := map[string][]string{
"*": fields,
}
return []string{"*"}, m
} }
func (ps *pipeSort) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { func (ps *pipeSort) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor {

View file

@ -0,0 +1,39 @@
package logstorage
import (
"testing"
)
func TestPipeSortUpdateNeededFields(t *testing.T) {
f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper()
nfs := newTestFieldsSet(neededFields)
unfs := newTestFieldsSet(unneededFields)
lex := newLexer(s)
lex.nextToken()
p, err := parsePipeSort(lex)
if err != nil {
t.Fatalf("cannot parse %s: %s", s, err)
}
p.updateNeededFields(nfs, unfs)
assertNeededFields(t, nfs, unfs, neededFieldsExpected, unneededFieldsExpected)
}
// all the needed fields
f("sort by(s1,s2)", "*", "", "*", "")
// all the needed fields, unneeded fields do not intersect with src
f("sort by(s1,s2)", "*", "f1,f2", "*", "f1,f2")
// all the needed fields, unneeded fields intersect with src
f("sort by(s1,s2)", "*", "s1,f1,f2", "*", "f1,f2")
// needed fields do not intersect with src
f("sort by(s1,s2)", "f1,f2", "", "s1,s2,f1,f2", "")
// needed fields intersect with src
f("sort by(s1,s2)", "s1,f1,f2", "", "s1,s2,f1,f2", "")
}

View file

@ -83,25 +83,24 @@ func (ps *pipeStats) String() string {
return s return s
} }
func (ps *pipeStats) getNeededFields() ([]string, map[string][]string) { func (ps *pipeStats) updateNeededFields(neededFields, unneededFields fieldsSet) {
var byFields []string neededFieldsOrig := neededFields.clone()
for _, bf := range ps.byFields { neededFields.reset()
byFields = append(byFields, bf.name)
byFields := make([]string, len(ps.byFields))
for i, bf := range ps.byFields {
byFields[i] = bf.name
} }
neededFields := append([]string{}, byFields...) for i, resultName := range ps.resultNames {
m := make(map[string][]string) if neededFieldsOrig.contains(resultName) && !unneededFields.contains(resultName) {
for i, f := range ps.funcs { funcFields := ps.funcs[i].neededFields()
funcFields := f.neededFields() neededFields.addAll(byFields)
neededFields.addAll(funcFields)
neededFields = append(neededFields, funcFields...) }
resultName := ps.resultNames[i]
m[resultName] = append(m[resultName], byFields...)
m[resultName] = append(m[resultName], funcFields...)
} }
return neededFields, m unneededFields.reset()
} }
const stateSizeBudgetChunk = 1 << 20 const stateSizeBudgetChunk = 1 << 20

View file

@ -4,116 +4,61 @@ import (
"testing" "testing"
) )
func TestTryParseBucketSize_Success(t *testing.T) { func TestPipeStatsUpdateNeededFields(t *testing.T) {
f := func(s string, resultExpected float64) { f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper() t.Helper()
result, ok := tryParseBucketSize(s) nfs := newTestFieldsSet(neededFields)
if !ok { unfs := newTestFieldsSet(unneededFields)
t.Fatalf("cannot parse %q", s)
} lex := newLexer(s)
if result != resultExpected { lex.nextToken()
t.Fatalf("unexpected result; got %f; want %f", result, resultExpected) p, err := parsePipeStats(lex)
if err != nil {
t.Fatalf("unexpected error when parsing %s: %s", s, err)
} }
p.updateNeededFields(nfs, unfs)
assertNeededFields(t, nfs, unfs, neededFieldsExpected, unneededFieldsExpected)
} }
// integers // all the needed fields
f("0", 0) f("stats count() r1", "*", "", "", "")
f("123", 123) f("stats count(*) r1", "*", "", "", "")
f("1_234_678", 1234678) f("stats count(f1,f2) r1", "*", "", "f1,f2", "")
f("-1_234_678", -1234678) f("stats count(f1,f2) r1, sum(f3,f4) r2", "*", "", "f1,f2,f3,f4", "")
f("stats by (b1,b2) count(f1,f2) r1", "*", "", "b1,b2,f1,f2", "")
f("stats by (b1,b2) count(f1,f2) r1, count(f1,f3) r2", "*", "", "b1,b2,f1,f2,f3", "")
// floating-point numbers // all the needed fields, unneeded fields do not intersect with stats fields
f("0.0", 0) f("stats count() r1", "*", "f1,f2", "", "")
f("123.435", 123.435) f("stats count(*) r1", "*", "f1,f2", "", "")
f("1_000.433_344", 1000.433344) f("stats count(f1,f2) r1", "*", "f3,f4", "f1,f2", "")
f("-1_000.433_344", -1000.433344) f("stats count(f1,f2) r1, sum(f3,f4) r2", "*", "f5,f6", "f1,f2,f3,f4", "")
f("stats by (b1,b2) count(f1,f2) r1", "*", "f3,f4", "b1,b2,f1,f2", "")
f("stats by (b1,b2) count(f1,f2) r1, count(f1,f3) r2", "*", "f4,f5", "b1,b2,f1,f2,f3", "")
// durations // all the needed fields, unneeded fields intersect with stats fields
f("5m", 5*nsecsPerMinute) f("stats count() r1", "*", "r1,r2", "", "")
f("1h5m3.5s", nsecsPerHour+5*nsecsPerMinute+3.5*nsecsPerSecond) f("stats count(*) r1", "*", "r1,r2", "", "")
f("-1h5m3.5s", -(nsecsPerHour + 5*nsecsPerMinute + 3.5*nsecsPerSecond)) f("stats count(f1,f2) r1", "*", "r1,r2", "", "")
f("stats count(f1,f2) r1, sum(f3,f4) r2", "*", "r1,r3", "f3,f4", "")
f("stats by (b1,b2) count(f1,f2) r1", "*", "r1,r2", "", "")
f("stats by (b1,b2) count(f1,f2) r1, count(f1,f3) r2", "*", "r1,r3", "b1,b2,f1,f3", "")
// bytes // needed fields do not intersect with stats fields
f("1B", 1) f("stats count() r1", "r2", "", "", "")
f("1K", 1_000) f("stats count(*) r1", "r2", "", "", "")
f("1KB", 1_000) f("stats count(f1,f2) r1", "r2", "", "", "")
f("5.5KiB", 5.5*(1<<10)) f("stats count(f1,f2) r1, sum(f3,f4) r2", "r3", "", "", "")
f("10MB500KB10B", 10*1_000_000+500*1_000+10) f("stats by (b1,b2) count(f1,f2) r1", "r2", "", "", "")
f("10M", 10*1_000_000) f("stats by (b1,b2) count(f1,f2) r1, count(f1,f3) r2", "r3", "", "", "")
f("-10MB", -10*1_000_000)
// ipv4 mask // needed fields intersect with stats fields
f("/0", 1<<32) f("stats count() r1", "r1,r2", "", "", "")
f("/32", 1) f("stats count(*) r1", "r1,r2", "", "", "")
f("/16", 1<<16) f("stats count(f1,f2) r1", "r1,r2", "", "f1,f2", "")
f("/8", 1<<24) f("stats count(f1,f2) r1, sum(f3,f4) r2", "r1,r3", "", "f1,f2", "")
} f("stats by (b1,b2) count(f1,f2) r1", "r1,r2", "", "b1,b2,f1,f2", "")
f("stats by (b1,b2) count(f1,f2) r1, count(f1,f3) r2", "r1,r3", "", "b1,b2,f1,f2", "")
func TestTryParseBucketSize_Failure(t *testing.T) {
f := func(s string) {
t.Helper()
_, ok := tryParseBucketSize(s)
if ok {
t.Fatalf("expecting error when parsing %q", s)
}
}
f("")
f("foo")
}
func TestTryParseBucketOffset_Success(t *testing.T) {
f := func(s string, resultExpected float64) {
t.Helper()
result, ok := tryParseBucketOffset(s)
if !ok {
t.Fatalf("cannot parse %q", s)
}
if result != resultExpected {
t.Fatalf("unexpected result; got %f; want %f", result, resultExpected)
}
}
// integers
f("0", 0)
f("123", 123)
f("1_234_678", 1234678)
f("-1_234_678", -1234678)
// floating-point numbers
f("0.0", 0)
f("123.435", 123.435)
f("1_000.433_344", 1000.433344)
f("-1_000.433_344", -1000.433344)
// durations
f("5m", 5*nsecsPerMinute)
f("1h5m3.5s", nsecsPerHour+5*nsecsPerMinute+3.5*nsecsPerSecond)
f("-1h5m3.5s", -(nsecsPerHour + 5*nsecsPerMinute + 3.5*nsecsPerSecond))
// bytes
f("1B", 1)
f("1K", 1_000)
f("1KB", 1_000)
f("5.5KiB", 5.5*(1<<10))
f("10MB500KB10B", 10*1_000_000+500*1_000+10)
f("10M", 10*1_000_000)
f("-10MB", -10*1_000_000)
}
func TestTryParseBucketOffset_Failure(t *testing.T) {
f := func(s string) {
t.Helper()
_, ok := tryParseBucketOffset(s)
if ok {
t.Fatalf("expecting error when parsing %q", s)
}
}
f("")
f("foo")
} }