From 48ed6abe7575e74726dfb5c99d95a3d5c0751d4a Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 9 May 2024 02:52:28 +0200 Subject: [PATCH] wip --- lib/logstorage/fields_set.go | 80 +++++++++++++++ lib/logstorage/fields_set_test.go | 94 ++++++++++++++++++ lib/logstorage/parser.go | 108 +-------------------- lib/logstorage/parser_test.go | 23 ----- lib/logstorage/pipe.go | 7 +- lib/logstorage/pipe_copy.go | 27 +++++- lib/logstorage/pipe_copy_test.go | 82 ++++++++++++++++ lib/logstorage/pipe_delete.go | 12 ++- lib/logstorage/pipe_delete_test.go | 39 ++++++++ lib/logstorage/pipe_fields.go | 23 ++++- lib/logstorage/pipe_fields_test.go | 43 ++++++++ lib/logstorage/pipe_limit.go | 3 +- lib/logstorage/pipe_offset.go | 3 +- lib/logstorage/pipe_rename.go | 32 ++++-- lib/logstorage/pipe_rename_test.go | 55 +++++++++++ lib/logstorage/pipe_sort.go | 23 ++--- lib/logstorage/pipe_sort_test.go | 39 ++++++++ lib/logstorage/pipe_stats.go | 29 +++--- lib/logstorage/pipe_stats_test.go | 151 +++++++++-------------------- 19 files changed, 585 insertions(+), 288 deletions(-) create mode 100644 lib/logstorage/fields_set.go create mode 100644 lib/logstorage/fields_set_test.go create mode 100644 lib/logstorage/pipe_copy_test.go create mode 100644 lib/logstorage/pipe_delete_test.go create mode 100644 lib/logstorage/pipe_fields_test.go create mode 100644 lib/logstorage/pipe_rename_test.go create mode 100644 lib/logstorage/pipe_sort_test.go diff --git a/lib/logstorage/fields_set.go b/lib/logstorage/fields_set.go new file mode 100644 index 000000000..e8d364a30 --- /dev/null +++ b/lib/logstorage/fields_set.go @@ -0,0 +1,80 @@ +package logstorage + +import ( + "sort" + "strings" +) + +type fieldsSet map[string]struct{} + +func newFieldsSet() fieldsSet { + return fieldsSet(map[string]struct{}{}) +} + +func (fs fieldsSet) reset() { + clear(fs) +} + +func (fs fieldsSet) String() string { + a := fs.getAll() + return "[" + strings.Join(a, ",") + "]" +} + +func (fs fieldsSet) clone() fieldsSet { + fsNew := newFieldsSet() + for _, f := range fs.getAll() { + fsNew.add(f) + } + return fsNew +} + +func (fs fieldsSet) getAll() []string { + a := make([]string, 0, len(fs)) + for f := range fs { + a = append(a, f) + } + sort.Strings(a) + return a +} + +func (fs fieldsSet) contains(field string) bool { + _, ok := fs[field] + if !ok { + _, ok = fs["*"] + } + return ok +} + +func (fs fieldsSet) removeAll(fields []string) { + for _, f := range fields { + fs.remove(f) + } +} + +func (fs fieldsSet) remove(field string) { + if field == "*" { + fs.reset() + return + } + if !fs.contains("*") { + delete(fs, field) + } +} + +func (fs fieldsSet) addAll(fields []string) { + for _, f := range fields { + fs.add(f) + } +} + +func (fs fieldsSet) add(field string) { + if fs.contains("*") { + return + } + if field == "*" { + fs.reset() + fs["*"] = struct{}{} + return + } + fs[field] = struct{}{} +} diff --git a/lib/logstorage/fields_set_test.go b/lib/logstorage/fields_set_test.go new file mode 100644 index 000000000..73b39f8af --- /dev/null +++ b/lib/logstorage/fields_set_test.go @@ -0,0 +1,94 @@ +package logstorage + +import ( + "reflect" + "testing" +) + +func TestFieldsSet(t *testing.T) { + fs := newFieldsSet() + + // verify add, remove and contains + if fs.contains("*") { + t.Fatalf("fs mustn't contain *") + } + if fs.contains("foo") { + t.Fatalf("fs musn't contain foo") + } + fs.add("foo") + fs.add("bar") + s := fs.String() + if s != "[bar,foo]" { + t.Fatalf("unexpected String() result; got %s; want %s", s, "[bar,foo]") + } + if !fs.contains("foo") { + t.Fatalf("fs must contain foo") + } + if !fs.contains("bar") { + t.Fatalf("fs must contain bar") + } + if fs.contains("baz") { + t.Fatalf("fs musn't contain baz") + } + if fs.contains("*") { + t.Fatalf("fs mustn't contain *") + } + fs.remove("foo") + if fs.contains("foo") { + t.Fatalf("fs mustn't contain foo") + } + fs.remove("bar") + if fs.contains("bar") { + t.Fatalf("fs mustn't contain bar") + } + + // verify * + fs.add("*") + if !fs.contains("*") { + t.Fatalf("fs must contain *") + } + if !fs.contains("foo") || !fs.contains("bar") || !fs.contains("baz") { + t.Fatalf("fs must contain anything") + } + fs.remove("foo") + if !fs.contains("foo") { + t.Fatalf("fs must contain anything") + } + fs.remove("*") + if fs.contains("foo") || fs.contains("bar") || fs.contains("baz") { + t.Fatalf("fs must be empty") + } + + // verify addAll, getAll, removeAll + fs.addAll([]string{"foo", "bar"}) + if !fs.contains("foo") || !fs.contains("bar") { + t.Fatalf("fs must contain foo and bar") + } + a := fs.getAll() + if !reflect.DeepEqual(a, []string{"bar", "foo"}) { + t.Fatalf("unexpected result from getAll(); got %q; want %q", a, []string{"bar", "foo"}) + } + fs.removeAll([]string{"bar", "baz"}) + if fs.contains("bar") || fs.contains("baz") { + t.Fatalf("fs mustn't contain bar and baz") + } + if !fs.contains("foo") { + t.Fatalf("fs must contain foo") + } + + // verify clone + fs.addAll([]string{"foo", "bar", "baz"}) + fsStr := fs.String() + fsCopy := fs.clone() + fsCopyStr := fsCopy.String() + if fsStr != fsCopyStr { + t.Fatalf("unexpected clone result; got %s; want %s", fsCopyStr, fsStr) + } + fsCopy.remove("foo") + if fsCopy.contains("foo") { + t.Fatalf("fsCopy mustn't contain foo") + } + if !fs.contains("foo") { + t.Fatalf("fs must contain foo") + } +} diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 9df8a7d2a..04c910e71 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -202,114 +202,16 @@ func (q *Query) String() string { } func (q *Query) getNeededColumns() []string { - input := []string{"*"} - dropFields := make(map[string]struct{}) + neededFields := newFieldsSet() + neededFields.add("*") + unneededFields := newFieldsSet() pipes := q.pipes for i := len(pipes) - 1; i >= 0; i-- { - neededFields, mapping := pipes[i].getNeededFields() - neededFields = normalizeFields(neededFields) - - referredFields := make(map[string]int) - for _, inFields := range mapping { - for _, f := range inFields { - referredFields[f]++ - } - } - - for k := range dropFields { - inFields := mapping[k] - for _, f := range inFields { - referredFields[f]-- - } - } - for k, v := range referredFields { - if v == 0 { - dropFields[k] = struct{}{} - } - } - dropFieldsNext := make(map[string]struct{}) - for k := range mapping { - if k != "*" && referredFields[k] == 0 { - dropFieldsNext[k] = struct{}{} - } - } - for k := range dropFields { - if referredFields[k] == 0 { - dropFieldsNext[k] = struct{}{} - } - } - - if len(dropFields) > 0 { - var neededFieldsDst []string - for _, f := range neededFields { - if _, ok := dropFields[f]; !ok { - neededFieldsDst = append(neededFieldsDst, f) - } - } - neededFields = neededFieldsDst - } - - if len(neededFields) == 0 { - input = nil - } - - // transform upper input fields to the current input fields according to the given mapping. - if len(input) == 0 || input[0] != "*" { - var dst []string - for _, f := range input { - if a, ok := mapping[f]; ok { - dst = append(dst, a...) - } else { - dst = append(dst, f) - } - } - if a, ok := mapping["*"]; ok { - dst = append(dst, a...) - } - input = normalizeFields(dst) - } - - // intersect neededFields with input - if len(neededFields) == 0 || neededFields[0] != "*" { - clear(dropFields) - if len(input) > 0 && input[0] == "*" { - input = neededFields - continue - } - m := make(map[string]struct{}) - for _, f := range input { - m[f] = struct{}{} - } - var dst []string - for _, f := range neededFields { - if _, ok := m[f]; ok { - dst = append(dst, f) - } - } - input = dst - } else { - dropFields = dropFieldsNext - } + pipes[i].updateNeededFields(neededFields, unneededFields) } - return input -} - -func normalizeFields(a []string) []string { - m := make(map[string]struct{}, len(a)) - dst := make([]string, 0, len(a)) - for _, s := range a { - if s == "*" { - return []string{"*"} - } - if _, ok := m[s]; ok { - continue - } - m[s] = struct{}{} - dst = append(dst, s) - } - return dst + return neededFields.getAll() } // ParseQuery parses s. diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index bc40c3326..a78fdf46d 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -3,7 +3,6 @@ package logstorage import ( "math" "reflect" - "slices" "testing" "time" ) @@ -1255,25 +1254,3 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | sort by(baz,`) f(`foo | sort by(bar) foo`) } - -func TestNormalizeFields(t *testing.T) { - f := func(fields, normalizedExpected []string) { - t.Helper() - - normalized := normalizeFields(fields) - if !slices.Equal(normalized, normalizedExpected) { - t.Fatalf("unexpected normalized fields for %q; got %q; want %q", fields, normalized, normalizedExpected) - } - } - - f(nil, nil) - f([]string{"foo"}, []string{"foo"}) - - // duplicate fields - f([]string{"foo", "bar", "foo", "x"}, []string{"foo", "bar", "x"}) - f([]string{"foo", "foo", "x", "x", "x"}, []string{"foo", "x"}) - - // star field - f([]string{"*"}, []string{"*"}) - f([]string{"foo", "*", "bar"}, []string{"*"}) -} diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index 09503cfce..c1576f465 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -8,11 +8,8 @@ type pipe interface { // String returns string representation of the pipe. String() string - // getNeededFields must return the required input fields alongside the mapping from output fields to input fields for the given pipe. - // - // It must return []string{"*"} if the set of input fields cannot be determined at the given pipe. - // It must return nil map if the pipe doesn't add new fields to the output. - getNeededFields() ([]string, map[string][]string) + // updateNeededFields must update neededFields and unneededFields with fields it needs and not needs at the input. + updateNeededFields(neededFields, unneededFields fieldsSet) // newPipeProcessor must return new pipeProcessor for the given ppBase. // diff --git a/lib/logstorage/pipe_copy.go b/lib/logstorage/pipe_copy.go index 6d0c4c459..a9730b4c1 100644 --- a/lib/logstorage/pipe_copy.go +++ b/lib/logstorage/pipe_copy.go @@ -31,12 +31,31 @@ func (pc *pipeCopy) String() string { return "copy " + strings.Join(a, ", ") } -func (pc *pipeCopy) getNeededFields() ([]string, map[string][]string) { - m := make(map[string][]string, len(pc.srcFields)) +func (pc *pipeCopy) updateNeededFields(neededFields, unneededFields fieldsSet) { + m := make(map[string]int) for i, dstField := range pc.dstFields { - m[dstField] = append(m[dstField], pc.srcFields[i]) + if neededFields.contains(dstField) && !unneededFields.contains(dstField) { + m[pc.srcFields[i]]++ + } + } + if neededFields.contains("*") { + // update only unneeded fields + unneededFields.addAll(pc.dstFields) + for i, srcField := range pc.srcFields { + if m[srcField] > 0 { + unneededFields.remove(pc.srcFields[i]) + } + } + } else { + // update only needed fields and reset unneeded fields + neededFields.removeAll(pc.dstFields) + for i, srcField := range pc.srcFields { + if m[srcField] > 0 { + neededFields.add(pc.srcFields[i]) + } + } + unneededFields.reset() } - return []string{"*"}, m } func (pc *pipeCopy) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { diff --git a/lib/logstorage/pipe_copy_test.go b/lib/logstorage/pipe_copy_test.go new file mode 100644 index 000000000..e29aece0b --- /dev/null +++ b/lib/logstorage/pipe_copy_test.go @@ -0,0 +1,82 @@ +package logstorage + +import ( + "strings" + "testing" +) + +func TestPipeCopyUpdateNeededFields(t *testing.T) { + f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + + nfs := newTestFieldsSet(neededFields) + unfs := newTestFieldsSet(unneededFields) + + lex := newLexer(s) + lex.nextToken() + p, err := parsePipeCopy(lex) + if err != nil { + t.Fatalf("cannot parse %s: %s", s, err) + } + p.updateNeededFields(nfs, unfs) + + assertNeededFields(t, nfs, unfs, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f("copy s1 d1, s2 d2", "*", "", "*", "d1,d2") + + // all the needed fields, unneeded fields do not intersect with src and dst + f("copy s1 d1 ,s2 d2", "*", "f1,f2", "*", "d1,d2,f1,f2") + + // all the needed fields, unneeded fields intersect with src + f("copy s1 d1 ,s2 d2", "*", "s1,f1,f2", "*", "d1,d2,f1,f2") + + // all the needed fields, unneeded fields intersect with dst + f("copy s1 d1, s2 d2", "*", "d2,f1,f2", "*", "d1,d2,f1,f2") + + // all the needed fields, unneeded fields intersect with src and dst + f("copy s1 d1, s2 d2", "*", "s1,d1,f1,f2", "*", "d1,d2,f1,f2,s1") + f("copy s1 d1, s2 d2", "*", "s1,d2,f1,f2", "*", "d1,d2,f1,f2") + + // needed fields do not intersect with src and dst + f("copy s1 d1, s2 d2", "f1,f2", "", "f1,f2", "") + + // needed fields intersect with src + f("copy s1 d1, s2 d2", "s1,f1,f2", "", "s1,f1,f2", "") + + // needed fields intersect with dst + f("copy s1 d1, s2 d2", "d1,f1,f2", "", "f1,f2,s1", "") + + // needed fields intersect with src and dst + f("copy s1 d1, s2 d2", "s1,d1,f1,f2", "", "s1,f1,f2", "") + f("copy s1 d1, s2 d2", "s1,d2,f1,f2", "", "s1,s2,f1,f2", "") + f("copy s1 d1, s2 d2", "s2,d1,f1,f2", "", "s1,s2,f1,f2", "") +} + +func assertNeededFields(t *testing.T, nfs, unfs fieldsSet, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + + nfsStr := nfs.String() + unfsStr := unfs.String() + + nfsExpected := newTestFieldsSet(neededFieldsExpected) + unfsExpected := newTestFieldsSet(unneededFieldsExpected) + nfsExpectedStr := nfsExpected.String() + unfsExpectedStr := unfsExpected.String() + + if nfsStr != nfsExpectedStr { + t.Fatalf("unexpected needed fields; got %s; want %s", nfsStr, nfsExpectedStr) + } + if unfsStr != unfsExpectedStr { + t.Fatalf("unexpected unneeded fields; got %s; want %s", unfsStr, unfsExpectedStr) + } +} + +func newTestFieldsSet(fields string) fieldsSet { + fs := newFieldsSet() + if fields != "" { + fs.addAll(strings.Split(fields, ",")) + } + return fs +} diff --git a/lib/logstorage/pipe_delete.go b/lib/logstorage/pipe_delete.go index 5bdf603fa..efcca0da3 100644 --- a/lib/logstorage/pipe_delete.go +++ b/lib/logstorage/pipe_delete.go @@ -22,12 +22,14 @@ func (pd *pipeDelete) String() string { return "delete " + fieldNamesString(pd.fields) } -func (pd *pipeDelete) getNeededFields() ([]string, map[string][]string) { - m := make(map[string][]string, len(pd.fields)) - for _, f := range pd.fields { - m[f] = nil +func (pd *pipeDelete) updateNeededFields(neededFields, unneededFields fieldsSet) { + if neededFields.contains("*") { + // update only unneeded fields + unneededFields.addAll(pd.fields) + } else { + // update only needed fields + neededFields.removeAll(pd.fields) } - return []string{"*"}, m } func (pd *pipeDelete) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { diff --git a/lib/logstorage/pipe_delete_test.go b/lib/logstorage/pipe_delete_test.go new file mode 100644 index 000000000..5124a5fcc --- /dev/null +++ b/lib/logstorage/pipe_delete_test.go @@ -0,0 +1,39 @@ +package logstorage + +import ( + "testing" +) + +func TestPipeDeleteUpdateNeededFields(t *testing.T) { + f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + + nfs := newTestFieldsSet(neededFields) + unfs := newTestFieldsSet(unneededFields) + + lex := newLexer(s) + lex.nextToken() + p, err := parsePipeDelete(lex) + if err != nil { + t.Fatalf("cannot parse %s: %s", s, err) + } + p.updateNeededFields(nfs, unfs) + + assertNeededFields(t, nfs, unfs, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f("del s1,s2", "*", "", "*", "s1,s2") + + // all the needed fields, unneeded fields do not intersect with src + f("del s1,s2", "*", "f1,f2", "*", "s1,s2,f1,f2") + + // all the needed fields, unneeded fields intersect with src + f("del s1,s2", "*", "s1,f1,f2", "*", "s1,s2,f1,f2") + + // needed fields do not intersect with src + f("del s1,s2", "f1,f2", "", "f1,f2", "") + + // needed fields intersect with src + f("del s1,s2", "s1,f1,f2", "", "f1,f2", "") +} diff --git a/lib/logstorage/pipe_fields.go b/lib/logstorage/pipe_fields.go index a1c9b3065..20a5a15de 100644 --- a/lib/logstorage/pipe_fields.go +++ b/lib/logstorage/pipe_fields.go @@ -25,11 +25,28 @@ func (pf *pipeFields) String() string { return "fields " + fieldNamesString(pf.fields) } -func (pf *pipeFields) getNeededFields() ([]string, map[string][]string) { +func (pf *pipeFields) updateNeededFields(neededFields, unneededFields fieldsSet) { if pf.containsStar { - return []string{"*"}, nil + return } - return pf.fields, nil + if neededFields.contains("*") { + // subtract unneeded fields from pf.fields + neededFields.reset() + neededFields.addAll(pf.fields) + for _, f := range unneededFields.getAll() { + neededFields.remove(f) + } + } else { + // intersect needed fields with pf.fields + neededFieldsOrig := neededFields.clone() + neededFields.reset() + for _, f := range pf.fields { + if neededFieldsOrig.contains(f) { + neededFields.add(f) + } + } + } + unneededFields.reset() } func (pf *pipeFields) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { diff --git a/lib/logstorage/pipe_fields_test.go b/lib/logstorage/pipe_fields_test.go new file mode 100644 index 000000000..ff020f4a2 --- /dev/null +++ b/lib/logstorage/pipe_fields_test.go @@ -0,0 +1,43 @@ +package logstorage + +import ( + "testing" +) + +func TestPipeFieldsUpdateNeededFields(t *testing.T) { + f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + + nfs := newTestFieldsSet(neededFields) + unfs := newTestFieldsSet(unneededFields) + + lex := newLexer(s) + lex.nextToken() + p, err := parsePipeFields(lex) + if err != nil { + t.Fatalf("cannot parse %s: %s", s, err) + } + p.updateNeededFields(nfs, unfs) + + assertNeededFields(t, nfs, unfs, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f("fields s1, s2", "*", "", "s1,s2", "") + f("fields *", "*", "", "*", "") + + // all the needed fields, unneeded fields do not intersect with src + f("fields s1, s2", "*", "f1,f2", "s1,s2", "") + f("fields *", "*", "f1,f2", "*", "f1,f2") + + // all the needed fields, unneeded fields intersect with src + f("fields s1, s2", "*", "s1,f1,f2", "s2", "") + f("fields *", "*", "s1,f1,f2", "*", "s1,f1,f2") + + // needed fields do not intersect with src + f("fields s1, s2", "f1,f2", "", "", "") + + // needed fields intersect with src + f("fields s1, s2", "s1,f1,f2", "", "s1", "") + f("fields *", "s1,f1,f2", "", "s1,f1,f2", "") +} diff --git a/lib/logstorage/pipe_limit.go b/lib/logstorage/pipe_limit.go index f35af350a..91793b9fb 100644 --- a/lib/logstorage/pipe_limit.go +++ b/lib/logstorage/pipe_limit.go @@ -16,8 +16,7 @@ func (pl *pipeLimit) String() string { return fmt.Sprintf("limit %d", pl.n) } -func (pl *pipeLimit) getNeededFields() ([]string, map[string][]string) { - return []string{"*"}, nil +func (pl *pipeLimit) updateNeededFields(_, _ fieldsSet) { } func (pl *pipeLimit) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { diff --git a/lib/logstorage/pipe_offset.go b/lib/logstorage/pipe_offset.go index a16620ccc..99af79357 100644 --- a/lib/logstorage/pipe_offset.go +++ b/lib/logstorage/pipe_offset.go @@ -16,8 +16,7 @@ func (po *pipeOffset) String() string { return fmt.Sprintf("offset %d", po.n) } -func (po *pipeOffset) getNeededFields() ([]string, map[string][]string) { - return []string{"*"}, nil +func (po *pipeOffset) updateNeededFields(_, _ fieldsSet) { } func (po *pipeOffset) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { diff --git a/lib/logstorage/pipe_rename.go b/lib/logstorage/pipe_rename.go index 4b62bd21b..1716ee28b 100644 --- a/lib/logstorage/pipe_rename.go +++ b/lib/logstorage/pipe_rename.go @@ -31,17 +31,33 @@ func (pr *pipeRename) String() string { return "rename " + strings.Join(a, ", ") } -func (pr *pipeRename) getNeededFields() ([]string, map[string][]string) { - m := make(map[string][]string, len(pr.srcFields)+len(pr.dstFields)) +func (pr *pipeRename) updateNeededFields(neededFields, unneededFields fieldsSet) { + m := make(map[string]int) for i, dstField := range pr.dstFields { - m[dstField] = append(m[dstField], pr.srcFields[i]) - } - for _, srcField := range pr.srcFields { - if _, ok := m[srcField]; !ok { - m[srcField] = nil + if neededFields.contains(dstField) && !unneededFields.contains(dstField) { + m[pr.srcFields[i]]++ } } - return []string{"*"}, m + if neededFields.contains("*") { + // update only unneeded fields + unneededFields.addAll(pr.dstFields) + for i, srcField := range pr.srcFields { + if m[srcField] > 0 { + unneededFields.remove(pr.srcFields[i]) + } + } + } else { + // update only needed fields and reset unneeded fields + neededFields.removeAll(pr.dstFields) + for i, srcField := range pr.srcFields { + if m[srcField] > 0 { + neededFields.add(pr.srcFields[i]) + } else { + neededFields.remove(pr.srcFields[i]) + } + } + unneededFields.reset() + } } func (pr *pipeRename) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { diff --git a/lib/logstorage/pipe_rename_test.go b/lib/logstorage/pipe_rename_test.go new file mode 100644 index 000000000..0e92f3605 --- /dev/null +++ b/lib/logstorage/pipe_rename_test.go @@ -0,0 +1,55 @@ +package logstorage + +import ( + "testing" +) + +func TestPipeRenameUpdateNeededFields(t *testing.T) { + f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + + nfs := newTestFieldsSet(neededFields) + unfs := newTestFieldsSet(unneededFields) + + lex := newLexer(s) + lex.nextToken() + p, err := parsePipeRename(lex) + if err != nil { + t.Fatalf("cannot parse %s: %s", s, err) + } + p.updateNeededFields(nfs, unfs) + + assertNeededFields(t, nfs, unfs, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f("rename s1 d1, s2 d2", "*", "", "*", "d1,d2") + + // all the needed fields, unneeded fields do not intersect with src and dst + f("rename s1 d1, s2 d2", "*", "f1,f2", "*", "d1,d2,f1,f2") + + // all the needed fields, unneeded fields intersect with src + // mv s1 d1, s2 d2 | rm s1, f1, f2 (d1, d2, f1, f2) + f("rename s1 d1, s2 d2", "*", "s1,f1,f2", "*", "d1,d2,f1,f2") + + // all the needed fields, unneeded fields intersect with dst + f("rename s1 d1, s2 d2", "*", "d2,f1,f2", "*", "d1,d2,f1,f2") + + // all the needed fields, unneeded fields intersect with src and dst + f("rename s1 d1, s2 d2", "*", "s1,d1,f1,f2", "*", "d1,d2,f1,f2,s1") + f("rename s1 d1, s2 d2", "*", "s1,d2,f1,f2", "*", "d1,d2,f1,f2") + + // needed fields do not intersect with src and dst + f("rename s1 d1, s2 d2", "f1,f2", "", "f1,f2", "") + + // needed fields intersect with src + f("rename s1 d1, s2 d2", "s1,f1,f2", "", "f1,f2", "") + + // needed fields intersect with dst + f("rename s1 d1, s2 d2", "d1,f1,f2", "", "f1,f2,s1", "") + + // needed fields intersect with src and dst + f("rename s1 d1, s2 d2", "s1,d1,f1,f2", "", "s1,f1,f2", "") + f("rename s1 d1, s2 d2", "s1,d2,f1,f2", "", "s2,f1,f2", "") + f("rename s1 d1, s2 d2", "s2,d1,f1,f2", "", "s1,f1,f2", "") +} diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index 8d380ad78..5207b0e96 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -41,23 +41,16 @@ func (ps *pipeSort) String() string { return s } -func (ps *pipeSort) getNeededFields() ([]string, map[string][]string) { - byFields := ps.byFields - - if len(byFields) == 0 { - return []string{"*"}, map[string][]string{ - "*": {"*"}, +func (ps *pipeSort) updateNeededFields(neededFields, unneededFields fieldsSet) { + if len(ps.byFields) == 0 { + neededFields.add("*") + unneededFields.reset() + } else { + for _, bf := range ps.byFields { + neededFields.add(bf.name) + unneededFields.remove(bf.name) } } - - fields := make([]string, len(byFields)) - for i, bf := range byFields { - fields[i] = bf.name - } - m := map[string][]string{ - "*": fields, - } - return []string{"*"}, m } func (ps *pipeSort) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { diff --git a/lib/logstorage/pipe_sort_test.go b/lib/logstorage/pipe_sort_test.go new file mode 100644 index 000000000..dd4bcebae --- /dev/null +++ b/lib/logstorage/pipe_sort_test.go @@ -0,0 +1,39 @@ +package logstorage + +import ( + "testing" +) + +func TestPipeSortUpdateNeededFields(t *testing.T) { + f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + + nfs := newTestFieldsSet(neededFields) + unfs := newTestFieldsSet(unneededFields) + + lex := newLexer(s) + lex.nextToken() + p, err := parsePipeSort(lex) + if err != nil { + t.Fatalf("cannot parse %s: %s", s, err) + } + p.updateNeededFields(nfs, unfs) + + assertNeededFields(t, nfs, unfs, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f("sort by(s1,s2)", "*", "", "*", "") + + // all the needed fields, unneeded fields do not intersect with src + f("sort by(s1,s2)", "*", "f1,f2", "*", "f1,f2") + + // all the needed fields, unneeded fields intersect with src + f("sort by(s1,s2)", "*", "s1,f1,f2", "*", "f1,f2") + + // needed fields do not intersect with src + f("sort by(s1,s2)", "f1,f2", "", "s1,s2,f1,f2", "") + + // needed fields intersect with src + f("sort by(s1,s2)", "s1,f1,f2", "", "s1,s2,f1,f2", "") +} diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 78c56e782..1bdc06446 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -83,25 +83,24 @@ func (ps *pipeStats) String() string { return s } -func (ps *pipeStats) getNeededFields() ([]string, map[string][]string) { - var byFields []string - for _, bf := range ps.byFields { - byFields = append(byFields, bf.name) +func (ps *pipeStats) updateNeededFields(neededFields, unneededFields fieldsSet) { + neededFieldsOrig := neededFields.clone() + neededFields.reset() + + byFields := make([]string, len(ps.byFields)) + for i, bf := range ps.byFields { + byFields[i] = bf.name } - neededFields := append([]string{}, byFields...) - m := make(map[string][]string) - for i, f := range ps.funcs { - funcFields := f.neededFields() - - neededFields = append(neededFields, funcFields...) - - resultName := ps.resultNames[i] - m[resultName] = append(m[resultName], byFields...) - m[resultName] = append(m[resultName], funcFields...) + for i, resultName := range ps.resultNames { + if neededFieldsOrig.contains(resultName) && !unneededFields.contains(resultName) { + funcFields := ps.funcs[i].neededFields() + neededFields.addAll(byFields) + neededFields.addAll(funcFields) + } } - return neededFields, m + unneededFields.reset() } const stateSizeBudgetChunk = 1 << 20 diff --git a/lib/logstorage/pipe_stats_test.go b/lib/logstorage/pipe_stats_test.go index a320fca3d..7bf66d083 100644 --- a/lib/logstorage/pipe_stats_test.go +++ b/lib/logstorage/pipe_stats_test.go @@ -4,116 +4,61 @@ import ( "testing" ) -func TestTryParseBucketSize_Success(t *testing.T) { - f := func(s string, resultExpected float64) { +func TestPipeStatsUpdateNeededFields(t *testing.T) { + f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { t.Helper() - result, ok := tryParseBucketSize(s) - if !ok { - t.Fatalf("cannot parse %q", s) - } - if result != resultExpected { - t.Fatalf("unexpected result; got %f; want %f", result, resultExpected) + nfs := newTestFieldsSet(neededFields) + unfs := newTestFieldsSet(unneededFields) + + lex := newLexer(s) + lex.nextToken() + p, err := parsePipeStats(lex) + if err != nil { + t.Fatalf("unexpected error when parsing %s: %s", s, err) } + p.updateNeededFields(nfs, unfs) + + assertNeededFields(t, nfs, unfs, neededFieldsExpected, unneededFieldsExpected) } - // integers - f("0", 0) - f("123", 123) - f("1_234_678", 1234678) - f("-1_234_678", -1234678) + // all the needed fields + f("stats count() r1", "*", "", "", "") + f("stats count(*) r1", "*", "", "", "") + f("stats count(f1,f2) r1", "*", "", "f1,f2", "") + f("stats count(f1,f2) r1, sum(f3,f4) r2", "*", "", "f1,f2,f3,f4", "") + f("stats by (b1,b2) count(f1,f2) r1", "*", "", "b1,b2,f1,f2", "") + f("stats by (b1,b2) count(f1,f2) r1, count(f1,f3) r2", "*", "", "b1,b2,f1,f2,f3", "") - // floating-point numbers - f("0.0", 0) - f("123.435", 123.435) - f("1_000.433_344", 1000.433344) - f("-1_000.433_344", -1000.433344) + // all the needed fields, unneeded fields do not intersect with stats fields + f("stats count() r1", "*", "f1,f2", "", "") + f("stats count(*) r1", "*", "f1,f2", "", "") + f("stats count(f1,f2) r1", "*", "f3,f4", "f1,f2", "") + f("stats count(f1,f2) r1, sum(f3,f4) r2", "*", "f5,f6", "f1,f2,f3,f4", "") + f("stats by (b1,b2) count(f1,f2) r1", "*", "f3,f4", "b1,b2,f1,f2", "") + f("stats by (b1,b2) count(f1,f2) r1, count(f1,f3) r2", "*", "f4,f5", "b1,b2,f1,f2,f3", "") - // durations - f("5m", 5*nsecsPerMinute) - f("1h5m3.5s", nsecsPerHour+5*nsecsPerMinute+3.5*nsecsPerSecond) - f("-1h5m3.5s", -(nsecsPerHour + 5*nsecsPerMinute + 3.5*nsecsPerSecond)) + // all the needed fields, unneeded fields intersect with stats fields + f("stats count() r1", "*", "r1,r2", "", "") + f("stats count(*) r1", "*", "r1,r2", "", "") + f("stats count(f1,f2) r1", "*", "r1,r2", "", "") + f("stats count(f1,f2) r1, sum(f3,f4) r2", "*", "r1,r3", "f3,f4", "") + f("stats by (b1,b2) count(f1,f2) r1", "*", "r1,r2", "", "") + f("stats by (b1,b2) count(f1,f2) r1, count(f1,f3) r2", "*", "r1,r3", "b1,b2,f1,f3", "") - // bytes - f("1B", 1) - f("1K", 1_000) - f("1KB", 1_000) - f("5.5KiB", 5.5*(1<<10)) - f("10MB500KB10B", 10*1_000_000+500*1_000+10) - f("10M", 10*1_000_000) - f("-10MB", -10*1_000_000) + // needed fields do not intersect with stats fields + f("stats count() r1", "r2", "", "", "") + f("stats count(*) r1", "r2", "", "", "") + f("stats count(f1,f2) r1", "r2", "", "", "") + f("stats count(f1,f2) r1, sum(f3,f4) r2", "r3", "", "", "") + f("stats by (b1,b2) count(f1,f2) r1", "r2", "", "", "") + f("stats by (b1,b2) count(f1,f2) r1, count(f1,f3) r2", "r3", "", "", "") - // ipv4 mask - f("/0", 1<<32) - f("/32", 1) - f("/16", 1<<16) - f("/8", 1<<24) -} - -func TestTryParseBucketSize_Failure(t *testing.T) { - f := func(s string) { - t.Helper() - - _, ok := tryParseBucketSize(s) - if ok { - t.Fatalf("expecting error when parsing %q", s) - } - } - - f("") - f("foo") -} - -func TestTryParseBucketOffset_Success(t *testing.T) { - f := func(s string, resultExpected float64) { - t.Helper() - - result, ok := tryParseBucketOffset(s) - if !ok { - t.Fatalf("cannot parse %q", s) - } - if result != resultExpected { - t.Fatalf("unexpected result; got %f; want %f", result, resultExpected) - } - } - - // integers - f("0", 0) - f("123", 123) - f("1_234_678", 1234678) - f("-1_234_678", -1234678) - - // floating-point numbers - f("0.0", 0) - f("123.435", 123.435) - f("1_000.433_344", 1000.433344) - f("-1_000.433_344", -1000.433344) - - // durations - f("5m", 5*nsecsPerMinute) - f("1h5m3.5s", nsecsPerHour+5*nsecsPerMinute+3.5*nsecsPerSecond) - f("-1h5m3.5s", -(nsecsPerHour + 5*nsecsPerMinute + 3.5*nsecsPerSecond)) - - // bytes - f("1B", 1) - f("1K", 1_000) - f("1KB", 1_000) - f("5.5KiB", 5.5*(1<<10)) - f("10MB500KB10B", 10*1_000_000+500*1_000+10) - f("10M", 10*1_000_000) - f("-10MB", -10*1_000_000) -} - -func TestTryParseBucketOffset_Failure(t *testing.T) { - f := func(s string) { - t.Helper() - - _, ok := tryParseBucketOffset(s) - if ok { - t.Fatalf("expecting error when parsing %q", s) - } - } - - f("") - f("foo") + // needed fields intersect with stats fields + f("stats count() r1", "r1,r2", "", "", "") + f("stats count(*) r1", "r1,r2", "", "", "") + f("stats count(f1,f2) r1", "r1,r2", "", "f1,f2", "") + f("stats count(f1,f2) r1, sum(f3,f4) r2", "r1,r3", "", "f1,f2", "") + f("stats by (b1,b2) count(f1,f2) r1", "r1,r2", "", "b1,b2,f1,f2", "") + f("stats by (b1,b2) count(f1,f2) r1, count(f1,f3) r2", "r1,r3", "", "b1,b2,f1,f2", "") }