From 53a378faab8ff3e31db6de80f498e22484ffd0a8 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 17 May 2024 04:11:10 +0200 Subject: [PATCH] wip --- lib/logstorage/bitmap.go | 2 + lib/logstorage/block_result.go | 159 ++++++++++++++--------- lib/logstorage/block_search.go | 4 +- lib/logstorage/fields_set.go | 31 +++-- lib/logstorage/fields_set_test.go | 34 +++-- lib/logstorage/filter.go | 6 + lib/logstorage/filter_and.go | 17 +++ lib/logstorage/filter_any_case_phrase.go | 15 ++- lib/logstorage/filter_any_case_prefix.go | 15 ++- lib/logstorage/filter_exact.go | 136 ++++++++++++++++++- lib/logstorage/filter_exact_prefix.go | 14 +- lib/logstorage/filter_in.go | 102 ++++++++++++++- lib/logstorage/filter_ipv4_range.go | 77 ++++++++++- lib/logstorage/filter_len_range.go | 116 ++++++++++++++++- lib/logstorage/filter_noop.go | 8 ++ lib/logstorage/filter_not.go | 14 ++ lib/logstorage/filter_or.go | 29 +++++ lib/logstorage/filter_phrase.go | 101 +++++++++++++- lib/logstorage/filter_prefix.go | 14 +- lib/logstorage/filter_range.go | 120 ++++++++++++++++- lib/logstorage/filter_regexp.go | 17 ++- lib/logstorage/filter_sequence.go | 21 ++- lib/logstorage/filter_stream.go | 65 +++++++++ lib/logstorage/filter_string_range.go | 24 +++- lib/logstorage/filter_test.go | 5 - lib/logstorage/filter_time.go | 92 +++++++++++++ lib/logstorage/indexdb.go | 4 +- lib/logstorage/parser_test.go | 7 + lib/logstorage/pipe_sort.go | 9 +- lib/logstorage/pipe_stats.go | 159 ++++++++++++++++++----- lib/logstorage/pipe_topk.go | 9 +- lib/logstorage/pipe_uniq.go | 11 +- lib/logstorage/stats_avg.go | 4 +- lib/logstorage/stats_count.go | 6 +- lib/logstorage/stats_count_empty.go | 4 +- lib/logstorage/stats_count_uniq.go | 4 +- lib/logstorage/stats_max.go | 12 +- lib/logstorage/stats_median.go | 4 +- lib/logstorage/stats_min.go | 12 +- lib/logstorage/stats_quantile.go | 4 +- lib/logstorage/stats_sum.go | 4 +- lib/logstorage/stats_sum_len.go | 4 +- lib/logstorage/stats_uniq_values.go | 4 +- lib/logstorage/stats_values.go | 4 +- lib/logstorage/storage_search_test.go | 12 -- lib/logstorage/stream_filter.go | 118 ++++++++++++++++- lib/storage/storage.go | 2 +- 47 files changed, 1416 insertions(+), 219 deletions(-) diff --git a/lib/logstorage/bitmap.go b/lib/logstorage/bitmap.go index 01f573b0f..d3d18bfac 100644 --- a/lib/logstorage/bitmap.go +++ b/lib/logstorage/bitmap.go @@ -50,6 +50,8 @@ func (bm *bitmap) copyFrom(src *bitmap) { } func (bm *bitmap) init(bitsLen int) { + bm.reset() + a := bm.a wordsLen := (bitsLen + 63) / 64 a = slicesutil.SetLength(a, wordsLen) diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index b3cb9d5f0..48a529316 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -19,21 +19,12 @@ import ( // // It is expected that its contents is accessed only from a single goroutine at a time. type blockResult struct { - // bs is the blockSearch used for fetching block data. - bs *blockSearch - - // bm is bitamp for fetching the needed rows from bs. - bm *bitmap - // a holds all the bytes behind the requested column values in the block. a arena // values holds all the requested column values in the block. valuesBuf []string - // streamID is streamID for the given blockResult. - streamID streamID - // timestamps contain timestamps for the selected log entries in the block. timestamps []int64 @@ -54,16 +45,11 @@ type blockResult struct { } func (br *blockResult) reset() { - br.bs = nil - br.bm = nil - br.a.reset() clear(br.valuesBuf) br.valuesBuf = br.valuesBuf[:0] - br.streamID.reset() - br.timestamps = br.timestamps[:0] br.csBufOffset = 0 @@ -81,9 +67,6 @@ func (br *blockResult) reset() { func (br *blockResult) clone() *blockResult { brNew := &blockResult{} - brNew.bs = br.bs - brNew.bm = br.bm - cs := br.getColumns() bufLen := 0 @@ -98,8 +81,6 @@ func (br *blockResult) clone() *blockResult { } brNew.valuesBuf = make([]string, 0, valuesBufLen) - brNew.streamID = br.streamID - brNew.timestamps = make([]int64, len(br.timestamps)) copy(brNew.timestamps, br.timestamps) @@ -112,6 +93,55 @@ func (br *blockResult) clone() *blockResult { return brNew } +// initFromNeededColumns initializes br from brSrc, by copying only the given neededColumns for rows identified by set bits at bm. +// +// The br valid until brSrc is reset or bm is updated. +func (br *blockResult) initFromNeededColumns(brSrc *blockResult, bm *bitmap, neededColumns []string) { + br.reset() + + srcTimestamps := brSrc.timestamps + dstTimestamps := br.timestamps[:0] + bm.forEachSetBitReadonly(func(idx int) { + dstTimestamps = append(dstTimestamps, srcTimestamps[idx]) + }) + br.timestamps = dstTimestamps + + for _, neededColumn := range neededColumns { + cSrc := brSrc.getColumnByName(neededColumn) + + cDst := blockResultColumn{ + name: cSrc.name, + } + + if cSrc.isConst { + cDst.isConst = true + cDst.valuesEncoded = cSrc.valuesEncoded + } else if cSrc.isTime { + cDst.isTime = true + } else { + cDst.valueType = cSrc.valueType + cDst.minValue = cSrc.minValue + cDst.maxValue = cSrc.maxValue + cDst.dictValues = cSrc.dictValues + cDst.newValuesEncodedFunc = func(br *blockResult) []string { + valuesEncodedSrc := cSrc.getValuesEncoded(brSrc) + + valuesBuf := br.valuesBuf + valuesBufLen := len(valuesBuf) + bm.forEachSetBitReadonly(func(idx int) { + valuesBuf = append(valuesBuf, valuesEncodedSrc[idx]) + }) + br.valuesBuf = valuesBuf + + return valuesBuf[valuesBufLen:] + } + } + + br.csBuf = append(br.csBuf, cDst) + br.csInitialized = false + } +} + // cloneValues clones the given values into br and returns the cloned values. func (br *blockResult) cloneValues(values []string) []string { valuesBufLen := len(br.valuesBuf) @@ -176,9 +206,10 @@ func (br *blockResult) setResultColumns(rcs []resultColumn) { br.csInitialized = false } -func (br *blockResult) initAllColumns() { - bs := br.bs - +// initAllColumns initializes all the columns in br according to bs and bm. +// +// The initialized columns are valid until bs and bm are changed. +func (br *blockResult) initAllColumns(bs *blockSearch, bm *bitmap) { unneededColumnNames := bs.bsw.so.unneededColumnNames if !slices.Contains(unneededColumnNames, "_time") { @@ -201,7 +232,7 @@ func (br *blockResult) initAllColumns() { if v != "" { br.addConstColumn("_msg", v) } else if ch := bs.csh.getColumnHeader("_msg"); ch != nil { - br.addColumn(ch) + br.addColumn(bs, bm, ch) } else { br.addConstColumn("_msg", "") } @@ -225,14 +256,15 @@ func (br *blockResult) initAllColumns() { continue } if !slices.Contains(unneededColumnNames, ch.name) { - br.addColumn(ch) + br.addColumn(bs, bm, ch) } } } -func (br *blockResult) initRequestedColumns() { - bs := br.bs - +// initRequestedColumns initialized only requested columns in br according to bs and bm. +// +// The initialized columns are valid until bs and bm are changed. +func (br *blockResult) initRequestedColumns(bs *blockSearch, bm *bitmap) { for _, columnName := range bs.bsw.so.neededColumnNames { switch columnName { case "_stream": @@ -248,7 +280,7 @@ func (br *blockResult) initRequestedColumns() { if v != "" { br.addConstColumn(columnName, v) } else if ch := bs.csh.getColumnHeader(columnName); ch != nil { - br.addColumn(ch) + br.addColumn(bs, bm, ch) } else { br.addConstColumn(columnName, "") } @@ -259,10 +291,6 @@ func (br *blockResult) initRequestedColumns() { func (br *blockResult) mustInit(bs *blockSearch, bm *bitmap) { br.reset() - br.bs = bs - br.bm = bm - br.streamID = bs.bsw.bh.streamID - if bm.isZero() { // Nothing to initialize for zero matching log entries in the block. return @@ -294,21 +322,10 @@ func (br *blockResult) mustInit(bs *blockSearch, bm *bitmap) { br.timestamps = dstTimestamps } -func (br *blockResult) newValuesEncodedForColumn(c *blockResultColumn) []string { - if c.isConst { - logger.Panicf("BUG: newValuesEncodedForColumn() musn't be called for const column") - } - if c.isTime { - logger.Panicf("BUG: newValuesEncodedForColumn() musn't be called for time column") - } - +func (br *blockResult) newValuesEncodedFromColumnHeader(bs *blockSearch, bm *bitmap, ch *columnHeader) []string { valuesBufLen := len(br.valuesBuf) - bs := br.bs - bm := br.bm - ch := &c.ch - - switch c.valueType { + switch ch.valueType { case valueTypeString: visitValuesReadonly(bs, ch, bm, br.addValue) case valueTypeDict: @@ -317,8 +334,8 @@ func (br *blockResult) newValuesEncodedForColumn(c *blockResultColumn) []string logger.Panicf("FATAL: %s: unexpected dict value size for column %q; got %d bytes; want 1 byte", bs.partPath(), ch.name, len(v)) } dictIdx := v[0] - if int(dictIdx) >= len(c.dictValues) { - logger.Panicf("FATAL: %s: too big dict index for column %q: %d; should be smaller than %d", bs.partPath(), ch.name, dictIdx, len(c.dictValues)) + if int(dictIdx) >= len(ch.valuesDict.values) { + logger.Panicf("FATAL: %s: too big dict index for column %q: %d; should be smaller than %d", bs.partPath(), ch.name, dictIdx, len(ch.valuesDict.values)) } br.addValue(v) }) @@ -380,18 +397,19 @@ func (br *blockResult) newValuesEncodedForColumn(c *blockResultColumn) []string // addColumn adds column for the given ch to br. // -// The added column is valid until ch is changed. -func (br *blockResult) addColumn(ch *columnHeader) { +// The added column is valid until bs, bm or ch is changed. +func (br *blockResult) addColumn(bs *blockSearch, bm *bitmap, ch *columnHeader) { br.csBuf = append(br.csBuf, blockResultColumn{ name: getCanonicalColumnName(ch.name), - ch: *ch, valueType: ch.valueType, + minValue: ch.minValue, + maxValue: ch.maxValue, dictValues: ch.valuesDict.values, + newValuesEncodedFunc: func(br *blockResult) []string { + return br.newValuesEncodedFromColumnHeader(bs, bm, ch) + }, }) br.csInitialized = false - - c := &br.csBuf[len(br.csBuf)-1] - c.ch.valuesDict.values = nil } func (br *blockResult) addTimeColumn() { @@ -406,7 +424,8 @@ func (br *blockResult) addStreamColumn(bs *blockSearch) bool { bb := bbPool.Get() defer bbPool.Put(bb) - bb.B = bs.bsw.p.pt.appendStreamTagsByStreamID(bb.B[:0], &br.streamID) + streamID := &bs.bsw.bh.streamID + bb.B = bs.bsw.p.pt.appendStreamTagsByStreamID(bb.B[:0], streamID) if len(bb.B) == 0 { // Couldn't find stream tags by streamID. This may be the case when the corresponding log stream // was recently registered and its tags aren't visible to search yet. @@ -1340,14 +1359,9 @@ func (br *blockResult) truncateRows(keepRows int) { // blockResultColumn doesn't own any referred data - all the referred data must be owned by blockResult. // This simplifies copying, resetting and re-using of the struct. type blockResultColumn struct { - // name is column name. + // name is column name name string - // ch is is used for initializing valuesEncoded for non-time and non-const columns. - // - // ch.valuesDict.values must be set to nil, since dict values for valueTypeDict are stored at dictValues. - ch columnHeader - // isConst is set to true if the column is const. // // The column value is stored in valuesEncoded[0] @@ -1361,6 +1375,16 @@ type blockResultColumn struct { // valueType is the type of non-cost value valueType valueType + // minValue is the minimum encoded value for uint*, ipv4, timestamp and float64 value + // + // It is used for fast detection of whether the given column contains values in the given range + minValue uint64 + + // maxValue is the maximum encoded value for uint*, ipv4, timestamp and float64 value + // + // It is used for fast detection of whether the given column contains values in the given range + maxValue uint64 + // dictValues contains dict values for valueType=valueTypeDict. dictValues []string @@ -1373,6 +1397,11 @@ type blockResultColumn struct { // valuesBucketed contains values after getValuesBucketed() call valuesBucketed []string + // newValuesEncodedFunc must return valuesEncoded. + // + // This func must be set for non-const and non-time columns if valuesEncoded field isn't set. + newValuesEncodedFunc func(br *blockResult) []string + // bucketSizeStr contains bucketSizeStr for valuesBucketed bucketSizeStr string @@ -1388,12 +1417,11 @@ func (c *blockResultColumn) clone(br *blockResult) blockResultColumn { cNew.name = br.a.copyString(c.name) - cNew.ch = c.ch - cNew.ch.valuesDict.values = nil - cNew.isConst = c.isConst cNew.isTime = c.isTime cNew.valueType = c.valueType + cNew.minValue = c.minValue + cNew.maxValue = c.maxValue cNew.dictValues = br.cloneValues(c.dictValues) cNew.valuesEncoded = br.cloneValues(c.valuesEncoded) @@ -1402,6 +1430,8 @@ func (c *blockResultColumn) clone(br *blockResult) blockResultColumn { } cNew.valuesBucketed = br.cloneValues(c.valuesBucketed) + cNew.newValuesEncodedFunc = c.newValuesEncodedFunc + cNew.bucketSizeStr = c.bucketSizeStr cNew.bucketOffsetStr = c.bucketOffsetStr @@ -1492,11 +1522,12 @@ func (c *blockResultColumn) getValuesEncoded(br *blockResult) []string { if c.isTime { return nil } + if values := c.valuesEncoded; values != nil { return values } - c.valuesEncoded = br.newValuesEncodedForColumn(c) + c.valuesEncoded = c.newValuesEncodedFunc(br) return c.valuesEncoded } diff --git a/lib/logstorage/block_search.go b/lib/logstorage/block_search.go index 34e7d6329..4e2710610 100644 --- a/lib/logstorage/block_search.go +++ b/lib/logstorage/block_search.go @@ -167,9 +167,9 @@ func (bs *blockSearch) search(bsw *blockSearchWork, bm *bitmap) { // fetch the requested columns to bs.br. if bs.bsw.so.needAllColumns { - bs.br.initAllColumns() + bs.br.initAllColumns(bs, bm) } else { - bs.br.initRequestedColumns() + bs.br.initRequestedColumns(bs, bm) } } diff --git a/lib/logstorage/fields_set.go b/lib/logstorage/fields_set.go index e8d364a30..00f803fee 100644 --- a/lib/logstorage/fields_set.go +++ b/lib/logstorage/fields_set.go @@ -37,12 +37,10 @@ func (fs fieldsSet) getAll() []string { return a } -func (fs fieldsSet) contains(field string) bool { - _, ok := fs[field] - if !ok { - _, ok = fs["*"] +func (fs fieldsSet) addAll(fields []string) { + for _, f := range fields { + fs.add(f) } - return ok } func (fs fieldsSet) removeAll(fields []string) { @@ -51,22 +49,30 @@ func (fs fieldsSet) removeAll(fields []string) { } } +func (fs fieldsSet) contains(field string) bool { + if field == "" { + field = "_msg" + } + _, ok := fs[field] + if !ok { + _, ok = fs["*"] + } + return ok +} + func (fs fieldsSet) remove(field string) { if field == "*" { fs.reset() return } if !fs.contains("*") { + if field == "" { + field = "_msg" + } delete(fs, field) } } -func (fs fieldsSet) addAll(fields []string) { - for _, f := range fields { - fs.add(f) - } -} - func (fs fieldsSet) add(field string) { if fs.contains("*") { return @@ -76,5 +82,8 @@ func (fs fieldsSet) add(field string) { fs["*"] = struct{}{} return } + if field == "" { + field = "_msg" + } fs[field] = struct{}{} } diff --git a/lib/logstorage/fields_set_test.go b/lib/logstorage/fields_set_test.go index 73b39f8af..6e848ac13 100644 --- a/lib/logstorage/fields_set_test.go +++ b/lib/logstorage/fields_set_test.go @@ -17,9 +17,10 @@ func TestFieldsSet(t *testing.T) { } fs.add("foo") fs.add("bar") + fs.add("") s := fs.String() - if s != "[bar,foo]" { - t.Fatalf("unexpected String() result; got %s; want %s", s, "[bar,foo]") + if s != "[_msg,bar,foo]" { + t.Fatalf("unexpected String() result; got %s; want %s", s, "[_msg,bar,foo]") } if !fs.contains("foo") { t.Fatalf("fs must contain foo") @@ -27,6 +28,12 @@ func TestFieldsSet(t *testing.T) { if !fs.contains("bar") { t.Fatalf("fs must contain bar") } + if !fs.contains("") { + t.Fatalf("fs must contain _msg") + } + if !fs.contains("_msg") { + t.Fatalf("fs must contain _msg") + } if fs.contains("baz") { t.Fatalf("fs musn't contain baz") } @@ -41,6 +48,13 @@ func TestFieldsSet(t *testing.T) { if fs.contains("bar") { t.Fatalf("fs mustn't contain bar") } + fs.remove("") + if fs.contains("") { + t.Fatalf("fs mustn't contain _msg") + } + if fs.contains("_msg") { + t.Fatalf("fs mustn't contain _msg") + } // verify * fs.add("*") @@ -60,17 +74,17 @@ func TestFieldsSet(t *testing.T) { } // verify addAll, getAll, removeAll - fs.addAll([]string{"foo", "bar"}) - if !fs.contains("foo") || !fs.contains("bar") { - t.Fatalf("fs must contain foo and bar") + fs.addAll([]string{"foo", "bar", "_msg"}) + if !fs.contains("foo") || !fs.contains("bar") || !fs.contains("_msg") { + t.Fatalf("fs must contain foo, bar and _msg") } a := fs.getAll() - if !reflect.DeepEqual(a, []string{"bar", "foo"}) { - t.Fatalf("unexpected result from getAll(); got %q; want %q", a, []string{"bar", "foo"}) + if !reflect.DeepEqual(a, []string{"_msg", "bar", "foo"}) { + t.Fatalf("unexpected result from getAll(); got %q; want %q", a, []string{"_msg", "bar", "foo"}) } - fs.removeAll([]string{"bar", "baz"}) - if fs.contains("bar") || fs.contains("baz") { - t.Fatalf("fs mustn't contain bar and baz") + fs.removeAll([]string{"bar", "baz", "_msg"}) + if fs.contains("bar") || fs.contains("baz") || fs.contains("_msg") { + t.Fatalf("fs mustn't contain bar, baz and _msg") } if !fs.contains("foo") { t.Fatalf("fs must contain foo") diff --git a/lib/logstorage/filter.go b/lib/logstorage/filter.go index 809c17893..11a30ab93 100644 --- a/lib/logstorage/filter.go +++ b/lib/logstorage/filter.go @@ -5,6 +5,12 @@ type filter interface { // String returns string representation of the filter String() string + // udpdateNeededFields must update neededFields with fields needed for the filter + updateNeededFields(neededFields fieldsSet) + // apply must update bm according to the filter applied to the given bs block apply(bs *blockSearch, bm *bitmap) + + // applyToBlockResult must update bm according to the filter applied to the given br block + applyToBlockResult(br *blockResult, bm *bitmap) } diff --git a/lib/logstorage/filter_and.go b/lib/logstorage/filter_and.go index fd5765004..1528bf797 100644 --- a/lib/logstorage/filter_and.go +++ b/lib/logstorage/filter_and.go @@ -31,6 +31,23 @@ func (fa *filterAnd) String() string { return strings.Join(a, " ") } +func (fa *filterAnd) updateNeededFields(neededFields fieldsSet) { + for _, f := range fa.filters { + f.updateNeededFields(neededFields) + } +} + +func (fa *filterAnd) applyToBlockResult(br *blockResult, bm *bitmap) { + for _, f := range fa.filters { + f.applyToBlockResult(br, bm) + if bm.isZero() { + // Shortcut - there is no need in applying the remaining filters, + // since the result will be zero anyway. + return + } + } +} + func (fa *filterAnd) apply(bs *blockSearch, bm *bitmap) { if !fa.matchMessageBloomFilter(bs) { // Fast path - fa doesn't match _msg bloom filter. diff --git a/lib/logstorage/filter_any_case_phrase.go b/lib/logstorage/filter_any_case_phrase.go index 6498c651b..af9cefd97 100644 --- a/lib/logstorage/filter_any_case_phrase.go +++ b/lib/logstorage/filter_any_case_phrase.go @@ -29,6 +29,10 @@ func (fp *filterAnyCasePhrase) String() string { return fmt.Sprintf("%si(%s)", quoteFieldNameIfNeeded(fp.fieldName), quoteTokenIfNeeded(fp.phrase)) } +func (fp *filterAnyCasePhrase) updateNeededFields(neededFields fieldsSet) { + neededFields.add(fp.fieldName) +} + func (fp *filterAnyCasePhrase) getTokens() []string { fp.tokensOnce.Do(fp.initTokens) return fp.tokens @@ -47,6 +51,11 @@ func (fp *filterAnyCasePhrase) initPhraseLowercase() { fp.phraseLowercase = strings.ToLower(fp.phrase) } +func (fp *filterAnyCasePhrase) applyToBlockResult(br *blockResult, bm *bitmap) { + phraseLowercase := fp.getPhraseLowercase() + applyToBlockResultGeneric(br, bm, fp.fieldName, phraseLowercase, matchAnyCasePhrase) +} + func (fp *filterAnyCasePhrase) apply(bs *blockSearch, bm *bitmap) { fieldName := fp.fieldName phraseLowercase := fp.getPhraseLowercase() @@ -100,10 +109,12 @@ func (fp *filterAnyCasePhrase) apply(bs *blockSearch, bm *bitmap) { func matchValuesDictByAnyCasePhrase(bs *blockSearch, ch *columnHeader, bm *bitmap, phraseLowercase string) { bb := bbPool.Get() - for i, v := range ch.valuesDict.values { + for _, v := range ch.valuesDict.values { + c := byte(0) if matchAnyCasePhrase(v, phraseLowercase) { - bb.B = append(bb.B, byte(i)) + c = 1 } + bb.B = append(bb.B, c) } matchEncodedValuesDict(bs, ch, bm, bb.B) bbPool.Put(bb) diff --git a/lib/logstorage/filter_any_case_prefix.go b/lib/logstorage/filter_any_case_prefix.go index fd89d43b6..86f413e67 100644 --- a/lib/logstorage/filter_any_case_prefix.go +++ b/lib/logstorage/filter_any_case_prefix.go @@ -33,6 +33,10 @@ func (fp *filterAnyCasePrefix) String() string { return fmt.Sprintf("%si(%s*)", quoteFieldNameIfNeeded(fp.fieldName), quoteTokenIfNeeded(fp.prefix)) } +func (fp *filterAnyCasePrefix) updateNeededFields(neededFields fieldsSet) { + neededFields.add(fp.fieldName) +} + func (fp *filterAnyCasePrefix) getTokens() []string { fp.tokensOnce.Do(fp.initTokens) return fp.tokens @@ -51,6 +55,11 @@ func (fp *filterAnyCasePrefix) initPrefixLowercase() { fp.prefixLowercase = strings.ToLower(fp.prefix) } +func (fp *filterAnyCasePrefix) applyToBlockResult(br *blockResult, bm *bitmap) { + prefixLowercase := fp.getPrefixLowercase() + applyToBlockResultGeneric(br, bm, fp.fieldName, prefixLowercase, matchAnyCasePrefix) +} + func (fp *filterAnyCasePrefix) apply(bs *blockSearch, bm *bitmap) { fieldName := fp.fieldName prefixLowercase := fp.getPrefixLowercase() @@ -101,10 +110,12 @@ func (fp *filterAnyCasePrefix) apply(bs *blockSearch, bm *bitmap) { func matchValuesDictByAnyCasePrefix(bs *blockSearch, ch *columnHeader, bm *bitmap, prefixLowercase string) { bb := bbPool.Get() - for i, v := range ch.valuesDict.values { + for _, v := range ch.valuesDict.values { + c := byte(0) if matchAnyCasePrefix(v, prefixLowercase) { - bb.B = append(bb.B, byte(i)) + c = 1 } + bb.B = append(bb.B, c) } matchEncodedValuesDict(bs, ch, bm, bb.B) bbPool.Put(bb) diff --git a/lib/logstorage/filter_exact.go b/lib/logstorage/filter_exact.go index f4cf1de01..11e749e85 100644 --- a/lib/logstorage/filter_exact.go +++ b/lib/logstorage/filter_exact.go @@ -24,6 +24,10 @@ func (fe *filterExact) String() string { return fmt.Sprintf("%sexact(%s)", quoteFieldNameIfNeeded(fe.fieldName), quoteTokenIfNeeded(fe.value)) } +func (fe *filterExact) updateNeededFields(neededFields fieldsSet) { + neededFields.add(fe.fieldName) +} + func (fe *filterExact) getTokens() []string { fe.tokensOnce.Do(fe.initTokens) return fe.tokens @@ -33,6 +37,132 @@ func (fe *filterExact) initTokens() { fe.tokens = tokenizeStrings(nil, []string{fe.value}) } +func (fe *filterExact) applyToBlockResult(br *blockResult, bm *bitmap) { + value := fe.value + + c := br.getColumnByName(fe.fieldName) + if c.isConst { + v := c.valuesEncoded[0] + if v != value { + bm.resetBits() + } + return + } + if c.isTime { + matchColumnByExactValue(br, bm, c, value) + return + } + + switch c.valueType { + case valueTypeString: + matchColumnByExactValue(br, bm, c, value) + case valueTypeDict: + bb := bbPool.Get() + for _, v := range c.dictValues { + c := byte(0) + if v == value { + c = 1 + } + bb.B = append(bb.B, c) + } + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + n := valuesEncoded[idx][0] + return bb.B[n] == 1 + }) + bbPool.Put(bb) + case valueTypeUint8: + n, ok := tryParseUint64(value) + if !ok || n >= (1<<8) { + bm.resetBits() + return + } + nNeeded := uint8(n) + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + n := unmarshalUint8(valuesEncoded[idx]) + return n == nNeeded + }) + case valueTypeUint16: + n, ok := tryParseUint64(value) + if !ok || n >= (1<<16) { + bm.resetBits() + return + } + nNeeded := uint16(n) + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + n := unmarshalUint16(valuesEncoded[idx]) + return n == nNeeded + }) + case valueTypeUint32: + n, ok := tryParseUint64(value) + if !ok || n >= (1<<32) { + bm.resetBits() + return + } + nNeeded := uint32(n) + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + n := unmarshalUint32(valuesEncoded[idx]) + return n == nNeeded + }) + case valueTypeUint64: + nNeeded, ok := tryParseUint64(value) + if !ok { + bm.resetBits() + return + } + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + n := unmarshalUint64(valuesEncoded[idx]) + return n == nNeeded + }) + case valueTypeFloat64: + fNeeded, ok := tryParseFloat64(value) + if !ok { + bm.resetBits() + return + } + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + f := unmarshalFloat64(valuesEncoded[idx]) + return f == fNeeded + }) + case valueTypeIPv4: + ipNeeded, ok := tryParseIPv4(value) + if !ok { + bm.resetBits() + return + } + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + ip := unmarshalIPv4(valuesEncoded[idx]) + return ip == ipNeeded + }) + case valueTypeTimestampISO8601: + timestampNeeded, ok := tryParseTimestampISO8601(value) + if !ok { + bm.resetBits() + return + } + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + timestamp := unmarshalTimestampISO8601(valuesEncoded[idx]) + return timestamp == timestampNeeded + }) + default: + logger.Panicf("FATAL: unknown valueType=%d", c.valueType) + } +} + +func matchColumnByExactValue(br *blockResult, bm *bitmap, c *blockResultColumn, value string) { + values := c.getValues(br) + bm.forEachSetBit(func(idx int) bool { + return values[idx] == value + }) +} + func (fe *filterExact) apply(bs *blockSearch, bm *bitmap) { fieldName := fe.fieldName value := fe.value @@ -121,10 +251,12 @@ func matchFloat64ByExactValue(bs *blockSearch, ch *columnHeader, bm *bitmap, val func matchValuesDictByExactValue(bs *blockSearch, ch *columnHeader, bm *bitmap, value string) { bb := bbPool.Get() - for i, v := range ch.valuesDict.values { + for _, v := range ch.valuesDict.values { + c := byte(0) if v == value { - bb.B = append(bb.B, byte(i)) + c = 1 } + bb.B = append(bb.B, c) } matchEncodedValuesDict(bs, ch, bm, bb.B) bbPool.Put(bb) diff --git a/lib/logstorage/filter_exact_prefix.go b/lib/logstorage/filter_exact_prefix.go index 1fa6475c4..13d7684f4 100644 --- a/lib/logstorage/filter_exact_prefix.go +++ b/lib/logstorage/filter_exact_prefix.go @@ -23,6 +23,10 @@ func (fep *filterExactPrefix) String() string { return fmt.Sprintf("%sexact(%s*)", quoteFieldNameIfNeeded(fep.fieldName), quoteTokenIfNeeded(fep.prefix)) } +func (fep *filterExactPrefix) updateNeededFields(neededFields fieldsSet) { + neededFields.add(fep.fieldName) +} + func (fep *filterExactPrefix) getTokens() []string { fep.tokensOnce.Do(fep.initTokens) return fep.tokens @@ -32,6 +36,10 @@ func (fep *filterExactPrefix) initTokens() { fep.tokens = getTokensSkipLast(fep.prefix) } +func (fep *filterExactPrefix) applyToBlockResult(br *blockResult, bm *bitmap) { + applyToBlockResultGeneric(br, bm, fep.fieldName, fep.prefix, matchExactPrefix) +} + func (fep *filterExactPrefix) apply(bs *blockSearch, bm *bitmap) { fieldName := fep.fieldName prefix := fep.prefix @@ -134,10 +142,12 @@ func matchFloat64ByExactPrefix(bs *blockSearch, ch *columnHeader, bm *bitmap, pr func matchValuesDictByExactPrefix(bs *blockSearch, ch *columnHeader, bm *bitmap, prefix string) { bb := bbPool.Get() - for i, v := range ch.valuesDict.values { + for _, v := range ch.valuesDict.values { + c := byte(0) if matchExactPrefix(v, prefix) { - bb.B = append(bb.B, byte(i)) + c = 1 } + bb.B = append(bb.B, c) } matchEncodedValuesDict(bs, ch, bm, bb.B) bbPool.Put(bb) diff --git a/lib/logstorage/filter_in.go b/lib/logstorage/filter_in.go index 57177dbac..67dfd4fc5 100644 --- a/lib/logstorage/filter_in.go +++ b/lib/logstorage/filter_in.go @@ -55,6 +55,10 @@ func (fi *filterIn) String() string { return fmt.Sprintf("%sin(%s)", quoteFieldNameIfNeeded(fi.fieldName), strings.Join(a, ",")) } +func (fi *filterIn) updateNeededFields(neededFields fieldsSet) { + neededFields.add(fi.fieldName) +} + func (fi *filterIn) getTokenSets() [][]string { fi.tokenSetsOnce.Do(fi.initTokenSets) return fi.tokenSets @@ -249,6 +253,94 @@ func (fi *filterIn) initTimestampISO8601Values() { fi.timestampISO8601Values = m } +func (fi *filterIn) applyToBlockResult(br *blockResult, bm *bitmap) { + if len(fi.values) == 0 { + bm.resetBits() + return + } + + c := br.getColumnByName(fi.fieldName) + if c.isConst { + stringValues := fi.getStringValues() + v := c.valuesEncoded[0] + if _, ok := stringValues[v]; !ok { + bm.resetBits() + } + return + } + if c.isTime { + fi.matchColumnByStringValues(br, bm, c) + return + } + + switch c.valueType { + case valueTypeString: + fi.matchColumnByStringValues(br, bm, c) + case valueTypeDict: + stringValues := fi.getStringValues() + bb := bbPool.Get() + for _, v := range c.dictValues { + c := byte(0) + if _, ok := stringValues[v]; ok { + c = 1 + } + bb.B = append(bb.B, c) + } + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + n := valuesEncoded[idx][0] + return bb.B[n] == 1 + }) + bbPool.Put(bb) + case valueTypeUint8: + binValues := fi.getUint8Values() + matchColumnByBinValues(br, bm, c, binValues) + case valueTypeUint16: + binValues := fi.getUint16Values() + matchColumnByBinValues(br, bm, c, binValues) + case valueTypeUint32: + binValues := fi.getUint32Values() + matchColumnByBinValues(br, bm, c, binValues) + case valueTypeUint64: + binValues := fi.getUint64Values() + matchColumnByBinValues(br, bm, c, binValues) + case valueTypeFloat64: + binValues := fi.getFloat64Values() + matchColumnByBinValues(br, bm, c, binValues) + case valueTypeIPv4: + binValues := fi.getIPv4Values() + matchColumnByBinValues(br, bm, c, binValues) + case valueTypeTimestampISO8601: + binValues := fi.getTimestampISO8601Values() + matchColumnByBinValues(br, bm, c, binValues) + default: + logger.Panicf("FATAL: unknown valueType=%d", c.valueType) + } +} + +func (fi *filterIn) matchColumnByStringValues(br *blockResult, bm *bitmap, c *blockResultColumn) { + stringValues := fi.getStringValues() + values := c.getValues(br) + bm.forEachSetBit(func(idx int) bool { + v := values[idx] + _, ok := stringValues[v] + return ok + }) +} + +func matchColumnByBinValues(br *blockResult, bm *bitmap, c *blockResultColumn, binValues map[string]struct{}) { + if len(binValues) == 0 { + bm.resetBits() + return + } + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + v := valuesEncoded[idx] + _, ok := binValues[v] + return ok + }) +} + func (fi *filterIn) apply(bs *blockSearch, bm *bitmap) { fieldName := fi.fieldName @@ -314,6 +406,10 @@ func (fi *filterIn) apply(bs *blockSearch, bm *bitmap) { } func matchAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, values map[string]struct{}, tokenSets [][]string) { + if len(values) == 0 { + bm.resetBits() + return + } if !matchBloomFilterAnyTokenSet(bs, ch, tokenSets) { bm.resetBits() return @@ -344,10 +440,12 @@ func matchBloomFilterAnyTokenSet(bs *blockSearch, ch *columnHeader, tokenSets [] func matchValuesDictByAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, values map[string]struct{}) { bb := bbPool.Get() - for i, v := range ch.valuesDict.values { + for _, v := range ch.valuesDict.values { + c := byte(0) if _, ok := values[v]; ok { - bb.B = append(bb.B, byte(i)) + c = 1 } + bb.B = append(bb.B, c) } matchEncodedValuesDict(bs, ch, bm, bb.B) bbPool.Put(bb) diff --git a/lib/logstorage/filter_ipv4_range.go b/lib/logstorage/filter_ipv4_range.go index c952eb1da..ef774a78f 100644 --- a/lib/logstorage/filter_ipv4_range.go +++ b/lib/logstorage/filter_ipv4_range.go @@ -21,6 +21,77 @@ func (fr *filterIPv4Range) String() string { return fmt.Sprintf("%sipv4_range(%s, %s)", quoteFieldNameIfNeeded(fr.fieldName), minValue, maxValue) } +func (fr *filterIPv4Range) updateNeededFields(neededFields fieldsSet) { + neededFields.add(fr.fieldName) +} + +func (fr *filterIPv4Range) applyToBlockResult(br *blockResult, bm *bitmap) { + minValue := fr.minValue + maxValue := fr.maxValue + + if minValue > maxValue { + bm.resetBits() + return + } + + c := br.getColumnByName(fr.fieldName) + if c.isConst { + v := c.valuesEncoded[0] + if !matchIPv4Range(v, minValue, maxValue) { + bm.resetBits() + } + return + } + if c.isTime { + bm.resetBits() + return + } + + switch c.valueType { + case valueTypeString: + values := c.getValues(br) + bm.forEachSetBit(func(idx int) bool { + v := values[idx] + return matchIPv4Range(v, minValue, maxValue) + }) + case valueTypeDict: + bb := bbPool.Get() + for _, v := range c.dictValues { + c := byte(0) + if matchIPv4Range(v, minValue, maxValue) { + c = 1 + } + bb.B = append(bb.B, c) + } + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + n := valuesEncoded[idx][0] + return bb.B[n] == 1 + }) + bbPool.Put(bb) + case valueTypeUint8: + bm.resetBits() + case valueTypeUint16: + bm.resetBits() + case valueTypeUint32: + bm.resetBits() + case valueTypeUint64: + bm.resetBits() + case valueTypeFloat64: + bm.resetBits() + case valueTypeIPv4: + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + ip := unmarshalIPv4(valuesEncoded[idx]) + return ip >= minValue && ip <= maxValue + }) + case valueTypeTimestampISO8601: + bm.resetBits() + default: + logger.Panicf("FATAL: unknown valueType=%d", c.valueType) + } +} + func (fr *filterIPv4Range) apply(bs *blockSearch, bm *bitmap) { fieldName := fr.fieldName minValue := fr.minValue @@ -73,10 +144,12 @@ func (fr *filterIPv4Range) apply(bs *blockSearch, bm *bitmap) { func matchValuesDictByIPv4Range(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue, maxValue uint32) { bb := bbPool.Get() - for i, v := range ch.valuesDict.values { + for _, v := range ch.valuesDict.values { + c := byte(0) if matchIPv4Range(v, minValue, maxValue) { - bb.B = append(bb.B, byte(i)) + c = 1 } + bb.B = append(bb.B, c) } matchEncodedValuesDict(bs, ch, bm, bb.B) bbPool.Put(bb) diff --git a/lib/logstorage/filter_len_range.go b/lib/logstorage/filter_len_range.go index b372d7b0e..3820cf394 100644 --- a/lib/logstorage/filter_len_range.go +++ b/lib/logstorage/filter_len_range.go @@ -21,6 +21,100 @@ func (fr *filterLenRange) String() string { return quoteFieldNameIfNeeded(fr.fieldName) + "len_range" + fr.stringRepr } +func (fr *filterLenRange) updateNeededFields(neededFields fieldsSet) { + neededFields.add(fr.fieldName) +} + +func (fr *filterLenRange) applyToBlockResult(br *blockResult, bm *bitmap) { + minLen := fr.minLen + maxLen := fr.maxLen + + if minLen > maxLen { + bm.resetBits() + return + } + + c := br.getColumnByName(fr.fieldName) + if c.isConst { + v := c.valuesEncoded[0] + if !matchLenRange(v, minLen, maxLen) { + bm.resetBits() + } + return + } + if c.isTime { + matchColumnByLenRange(br, bm, c, minLen, maxLen) + } + + switch c.valueType { + case valueTypeString: + matchColumnByLenRange(br, bm, c, minLen, maxLen) + case valueTypeDict: + bb := bbPool.Get() + for _, v := range c.dictValues { + c := byte(0) + if matchLenRange(v, minLen, maxLen) { + c = 1 + } + bb.B = append(bb.B, c) + } + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + n := valuesEncoded[idx][0] + return bb.B[n] == 1 + }) + bbPool.Put(bb) + case valueTypeUint8: + if minLen > 3 || maxLen == 0 { + bm.resetBits() + return + } + matchColumnByLenRange(br, bm, c, minLen, maxLen) + case valueTypeUint16: + if minLen > 5 || maxLen == 0 { + bm.resetBits() + return + } + matchColumnByLenRange(br, bm, c, minLen, maxLen) + case valueTypeUint32: + if minLen > 10 || maxLen == 0 { + bm.resetBits() + return + } + matchColumnByLenRange(br, bm, c, minLen, maxLen) + case valueTypeUint64: + if minLen > 20 || maxLen == 0 { + bm.resetBits() + return + } + matchColumnByLenRange(br, bm, c, minLen, maxLen) + case valueTypeFloat64: + if minLen > 24 || maxLen == 0 { + bm.resetBits() + return + } + matchColumnByLenRange(br, bm, c, minLen, maxLen) + case valueTypeIPv4: + if minLen > uint64(len("255.255.255.255")) || maxLen < uint64(len("0.0.0.0")) { + bm.resetBits() + return + } + matchColumnByLenRange(br, bm, c, minLen, maxLen) + case valueTypeTimestampISO8601: + matchTimestampISO8601ByLenRange(bm, minLen, maxLen) + default: + logger.Panicf("FATAL: unknown valueType=%d", c.valueType) + } +} + +func matchColumnByLenRange(br *blockResult, bm *bitmap, c *blockResultColumn, minLen, maxLen uint64) { + values := c.getValues(br) + bm.forEachSetBit(func(idx int) bool { + v := values[idx] + return matchLenRange(v, minLen, maxLen) + }) +} + func (fr *filterLenRange) apply(bs *blockSearch, bm *bitmap) { fieldName := fr.fieldName minLen := fr.minLen @@ -110,10 +204,12 @@ func matchFloat64ByLenRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minLe func matchValuesDictByLenRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minLen, maxLen uint64) { bb := bbPool.Get() - for i, v := range ch.valuesDict.values { + for _, v := range ch.valuesDict.values { + c := byte(0) if matchLenRange(v, minLen, maxLen) { - bb.B = append(bb.B, byte(i)) + c = 1 } + bb.B = append(bb.B, c) } matchEncodedValuesDict(bs, ch, bm, bb.B) bbPool.Put(bb) @@ -126,6 +222,10 @@ func matchStringByLenRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minLen } func matchUint8ByLenRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minLen, maxLen uint64) { + if minLen > 3 || maxLen == 0 { + bm.resetBits() + return + } if !matchMinMaxValueLen(ch, minLen, maxLen) { bm.resetBits() return @@ -140,6 +240,10 @@ func matchUint8ByLenRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minLen, } func matchUint16ByLenRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minLen, maxLen uint64) { + if minLen > 5 || maxLen == 0 { + bm.resetBits() + return + } if !matchMinMaxValueLen(ch, minLen, maxLen) { bm.resetBits() return @@ -154,6 +258,10 @@ func matchUint16ByLenRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minLen } func matchUint32ByLenRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minLen, maxLen uint64) { + if minLen > 10 || maxLen == 0 { + bm.resetBits() + return + } if !matchMinMaxValueLen(ch, minLen, maxLen) { bm.resetBits() return @@ -168,6 +276,10 @@ func matchUint32ByLenRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minLen } func matchUint64ByLenRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minLen, maxLen uint64) { + if minLen > 20 || maxLen == 0 { + bm.resetBits() + return + } if !matchMinMaxValueLen(ch, minLen, maxLen) { bm.resetBits() return diff --git a/lib/logstorage/filter_noop.go b/lib/logstorage/filter_noop.go index d9bbb5775..22718e537 100644 --- a/lib/logstorage/filter_noop.go +++ b/lib/logstorage/filter_noop.go @@ -8,6 +8,14 @@ func (fn *filterNoop) String() string { return "" } +func (fn *filterNoop) updateNeededFields(neededFields fieldsSet) { + // nothing to do +} + +func (fn *filterNoop) applyToBlockResult(_ *blockResult, _ *bitmap) { + // nothing to do +} + func (fn *filterNoop) apply(_ *blockSearch, _ *bitmap) { // nothing to do } diff --git a/lib/logstorage/filter_not.go b/lib/logstorage/filter_not.go index 37b36f2e7..292763033 100644 --- a/lib/logstorage/filter_not.go +++ b/lib/logstorage/filter_not.go @@ -16,6 +16,20 @@ func (fn *filterNot) String() string { return "!" + s } +func (fn *filterNot) updateNeededFields(neededFields fieldsSet) { + fn.f.updateNeededFields(neededFields) +} + +func (fn *filterNot) applyToBlockResult(br *blockResult, bm *bitmap) { + // Minimize the number of rows to check by the filter by applying it + // only to the rows, which match the bm, e.g. they may change the bm result. + bmTmp := getBitmap(bm.bitsLen) + bmTmp.copyFrom(bm) + fn.f.applyToBlockResult(br, bmTmp) + bm.andNot(bmTmp) + putBitmap(bmTmp) +} + func (fn *filterNot) apply(bs *blockSearch, bm *bitmap) { // Minimize the number of rows to check by the filter by applying it // only to the rows, which match the bm, e.g. they may change the bm result. diff --git a/lib/logstorage/filter_or.go b/lib/logstorage/filter_or.go index 564fa5618..0ec3e7c83 100644 --- a/lib/logstorage/filter_or.go +++ b/lib/logstorage/filter_or.go @@ -21,6 +21,35 @@ func (fo *filterOr) String() string { return strings.Join(a, " or ") } +func (fo *filterOr) updateNeededFields(neededFields fieldsSet) { + for _, f := range fo.filters { + f.updateNeededFields(neededFields) + } +} + +func (fo *filterOr) applyToBlockResult(br *blockResult, bm *bitmap) { + bmResult := getBitmap(bm.bitsLen) + bmTmp := getBitmap(bm.bitsLen) + for _, f := range fo.filters { + // Minimize the number of rows to check by the filter by checking only + // the rows, which may change the output bm: + // - bm matches them, e.g. the caller wants to get them + // - bmResult doesn't match them, e.g. all the previous OR filters didn't match them + bmTmp.copyFrom(bm) + bmTmp.andNot(bmResult) + if bmTmp.isZero() { + // Shortcut - there is no need in applying the remaining filters, + // since the result already matches all the values from the block. + break + } + f.applyToBlockResult(br, bmTmp) + bmResult.or(bmTmp) + } + putBitmap(bmTmp) + bm.copyFrom(bmResult) + putBitmap(bmResult) +} + func (fo *filterOr) apply(bs *blockSearch, bm *bitmap) { bmResult := getBitmap(bm.bitsLen) bmTmp := getBitmap(bm.bitsLen) diff --git a/lib/logstorage/filter_phrase.go b/lib/logstorage/filter_phrase.go index a2dbd996b..76658c5b3 100644 --- a/lib/logstorage/filter_phrase.go +++ b/lib/logstorage/filter_phrase.go @@ -32,6 +32,10 @@ func (fp *filterPhrase) String() string { return quoteFieldNameIfNeeded(fp.fieldName) + quoteTokenIfNeeded(fp.phrase) } +func (fp *filterPhrase) updateNeededFields(neededColumns fieldsSet) { + neededColumns.add(fp.fieldName) +} + func (fp *filterPhrase) getTokens() []string { fp.tokensOnce.Do(fp.initTokens) return fp.tokens @@ -41,6 +45,10 @@ func (fp *filterPhrase) initTokens() { fp.tokens = tokenizeStrings(nil, []string{fp.phrase}) } +func (fp *filterPhrase) applyToBlockResult(br *blockResult, bm *bitmap) { + applyToBlockResultGeneric(br, bm, fp.fieldName, fp.phrase, matchPhrase) +} + func (fp *filterPhrase) apply(bs *blockSearch, bm *bitmap) { fieldName := fp.fieldName phrase := fp.phrase @@ -168,10 +176,12 @@ func matchFloat64ByPhrase(bs *blockSearch, ch *columnHeader, bm *bitmap, phrase func matchValuesDictByPhrase(bs *blockSearch, ch *columnHeader, bm *bitmap, phrase string) { bb := bbPool.Get() - for i, v := range ch.valuesDict.values { + for _, v := range ch.valuesDict.values { + c := byte(0) if matchPhrase(v, phrase) { - bb.B = append(bb.B, byte(i)) + c = 1 } + bb.B = append(bb.B, c) } matchEncodedValuesDict(bs, ch, bm, bb.B) bbPool.Put(bb) @@ -249,7 +259,7 @@ func getPhrasePos(s, phrase string) int { } func matchEncodedValuesDict(bs *blockSearch, ch *columnHeader, bm *bitmap, encodedValues []byte) { - if len(encodedValues) == 0 { + if bytes.IndexByte(encodedValues, 1) < 0 { // Fast path - the phrase is missing in the valuesDict bm.resetBits() return @@ -259,8 +269,11 @@ func matchEncodedValuesDict(bs *blockSearch, ch *columnHeader, bm *bitmap, encod if len(v) != 1 { logger.Panicf("FATAL: %s: unexpected length for dict value: got %d; want 1", bs.partPath(), len(v)) } - n := bytes.IndexByte(encodedValues, v[0]) - return n >= 0 + idx := v[0] + if int(idx) >= len(encodedValues) { + logger.Panicf("FATAL: %s: too big index for dict value; got %d; must be smaller than %d", bs.partPath(), idx, len(encodedValues)) + } + return encodedValues[idx] == 1 }) } @@ -320,3 +333,81 @@ func toTimestampISO8601String(bs *blockSearch, bb *bytesutil.ByteBuffer, v strin bb.B = marshalTimestampISO8601String(bb.B[:0], timestamp) return bytesutil.ToUnsafeString(bb.B) } + +func applyToBlockResultGeneric(br *blockResult, bm *bitmap, fieldName, phrase string, matchFunc func(v, phrase string) bool) { + c := br.getColumnByName(fieldName) + if c.isConst { + v := c.valuesEncoded[0] + if !matchFunc(v, phrase) { + bm.resetBits() + } + return + } + if c.isTime { + matchColumnByPhraseGeneric(br, bm, c, phrase, matchFunc) + return + } + + switch c.valueType { + case valueTypeString: + matchColumnByPhraseGeneric(br, bm, c, phrase, matchFunc) + case valueTypeDict: + bb := bbPool.Get() + for _, v := range c.dictValues { + c := byte(0) + if matchFunc(v, phrase) { + c = 1 + } + bb.B = append(bb.B, c) + } + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + n := valuesEncoded[idx][0] + return bb.B[n] == 1 + }) + bbPool.Put(bb) + case valueTypeUint8: + n, ok := tryParseUint64(phrase) + if !ok || n >= (1<<8) { + bm.resetBits() + return + } + matchColumnByPhraseGeneric(br, bm, c, phrase, matchFunc) + case valueTypeUint16: + n, ok := tryParseUint64(phrase) + if !ok || n >= (1<<16) { + bm.resetBits() + return + } + matchColumnByPhraseGeneric(br, bm, c, phrase, matchFunc) + case valueTypeUint32: + n, ok := tryParseUint64(phrase) + if !ok || n >= (1<<32) { + bm.resetBits() + return + } + matchColumnByPhraseGeneric(br, bm, c, phrase, matchFunc) + case valueTypeUint64: + _, ok := tryParseUint64(phrase) + if !ok { + bm.resetBits() + return + } + matchColumnByPhraseGeneric(br, bm, c, phrase, matchFunc) + case valueTypeFloat64: + matchColumnByPhraseGeneric(br, bm, c, phrase, matchFunc) + case valueTypeIPv4: + matchColumnByPhraseGeneric(br, bm, c, phrase, matchFunc) + case valueTypeTimestampISO8601: + matchColumnByPhraseGeneric(br, bm, c, phrase, matchFunc) + default: + logger.Panicf("FATAL: unknown valueType=%d", c.valueType) + } +} + +func matchColumnByPhraseGeneric(br *blockResult, bm *bitmap, c *blockResultColumn, phrase string, matchFunc func(v, phrase string) bool) { + values := c.getValues(br) + bm.forEachSetBit(func(idx int) bool { + return matchFunc(values[idx], phrase) + }) +} diff --git a/lib/logstorage/filter_prefix.go b/lib/logstorage/filter_prefix.go index 953c3e2b6..853e2d5f9 100644 --- a/lib/logstorage/filter_prefix.go +++ b/lib/logstorage/filter_prefix.go @@ -30,6 +30,10 @@ func (fp *filterPrefix) String() string { return fmt.Sprintf("%s%s*", quoteFieldNameIfNeeded(fp.fieldName), quoteTokenIfNeeded(fp.prefix)) } +func (fp *filterPrefix) updateNeededFields(neededFields fieldsSet) { + neededFields.add(fp.fieldName) +} + func (fp *filterPrefix) getTokens() []string { fp.tokensOnce.Do(fp.initTokens) return fp.tokens @@ -39,6 +43,10 @@ func (fp *filterPrefix) initTokens() { fp.tokens = getTokensSkipLast(fp.prefix) } +func (fp *filterPrefix) applyToBlockResult(bs *blockResult, bm *bitmap) { + applyToBlockResultGeneric(bs, bm, fp.fieldName, fp.prefix, matchPrefix) +} + func (fp *filterPrefix) apply(bs *blockSearch, bm *bitmap) { fieldName := fp.fieldName prefix := fp.prefix @@ -158,10 +166,12 @@ func matchFloat64ByPrefix(bs *blockSearch, ch *columnHeader, bm *bitmap, prefix func matchValuesDictByPrefix(bs *blockSearch, ch *columnHeader, bm *bitmap, prefix string) { bb := bbPool.Get() - for i, v := range ch.valuesDict.values { + for _, v := range ch.valuesDict.values { + c := byte(0) if matchPrefix(v, prefix) { - bb.B = append(bb.B, byte(i)) + c = 1 } + bb.B = append(bb.B, c) } matchEncodedValuesDict(bs, ch, bm, bb.B) bbPool.Put(bb) diff --git a/lib/logstorage/filter_range.go b/lib/logstorage/filter_range.go index 29f8eb78f..851faf340 100644 --- a/lib/logstorage/filter_range.go +++ b/lib/logstorage/filter_range.go @@ -21,6 +21,120 @@ func (fr *filterRange) String() string { return quoteFieldNameIfNeeded(fr.fieldName) + "range" + fr.stringRepr } +func (fr *filterRange) updateNeededFields(neededFields fieldsSet) { + neededFields.add(fr.fieldName) +} + +func (fr *filterRange) applyToBlockResult(br *blockResult, bm *bitmap) { + minValue := fr.minValue + maxValue := fr.maxValue + + if minValue > maxValue { + bm.resetBits() + return + } + + c := br.getColumnByName(fr.fieldName) + if c.isConst { + v := c.valuesEncoded[0] + if !matchRange(v, minValue, maxValue) { + bm.resetBits() + } + return + } + if c.isTime { + bm.resetBits() + return + } + + switch c.valueType { + case valueTypeString: + values := c.getValues(br) + bm.forEachSetBit(func(idx int) bool { + v := values[idx] + return matchRange(v, minValue, maxValue) + }) + case valueTypeDict: + bb := bbPool.Get() + for _, v := range c.dictValues { + c := byte(0) + if matchRange(v, minValue, maxValue) { + c = 1 + } + bb.B = append(bb.B, c) + } + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + n := valuesEncoded[idx][0] + return bb.B[n] == 1 + }) + bbPool.Put(bb) + case valueTypeUint8: + minValueUint, maxValueUint := toUint64Range(minValue, maxValue) + if maxValue < 0 || minValueUint > c.maxValue || maxValueUint < c.minValue { + bm.resetBits() + return + } + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + v := valuesEncoded[idx] + n := uint64(unmarshalUint8(v)) + return n >= minValueUint && n <= maxValueUint + }) + case valueTypeUint16: + minValueUint, maxValueUint := toUint64Range(minValue, maxValue) + if maxValue < 0 || minValueUint > c.maxValue || maxValueUint < c.minValue { + bm.resetBits() + return + } + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + v := valuesEncoded[idx] + n := uint64(unmarshalUint16(v)) + return n >= minValueUint && n <= maxValueUint + }) + case valueTypeUint32: + minValueUint, maxValueUint := toUint64Range(minValue, maxValue) + if maxValue < 0 || minValueUint > c.maxValue || maxValueUint < c.minValue { + bm.resetBits() + return + } + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + v := valuesEncoded[idx] + n := uint64(unmarshalUint32(v)) + return n >= minValueUint && n <= maxValueUint + }) + case valueTypeUint64: + minValueUint, maxValueUint := toUint64Range(minValue, maxValue) + if maxValue < 0 || minValueUint > c.maxValue || maxValueUint < c.minValue { + bm.resetBits() + return + } + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + v := valuesEncoded[idx] + n := unmarshalUint64(v) + return n >= minValueUint && n <= maxValueUint + }) + case valueTypeFloat64: + if minValue > math.Float64frombits(c.maxValue) || maxValue < math.Float64frombits(c.minValue) { + bm.resetBits() + return + } + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + v := valuesEncoded[idx] + f := unmarshalFloat64(v) + return f >= minValue && f <= maxValue + }) + case valueTypeTimestampISO8601: + bm.resetBits() + default: + logger.Panicf("FATAL: unknown valueType=%d", c.valueType) + } +} + func (fr *filterRange) apply(bs *blockSearch, bm *bitmap) { fieldName := fr.fieldName minValue := fr.minValue @@ -88,10 +202,12 @@ func matchFloat64ByRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue func matchValuesDictByRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue, maxValue float64) { bb := bbPool.Get() - for i, v := range ch.valuesDict.values { + for _, v := range ch.valuesDict.values { + c := byte(0) if matchRange(v, minValue, maxValue) { - bb.B = append(bb.B, byte(i)) + c = 1 } + bb.B = append(bb.B, c) } matchEncodedValuesDict(bs, ch, bm, bb.B) bbPool.Put(bb) diff --git a/lib/logstorage/filter_regexp.go b/lib/logstorage/filter_regexp.go index 990d62ecf..725bf5aed 100644 --- a/lib/logstorage/filter_regexp.go +++ b/lib/logstorage/filter_regexp.go @@ -19,6 +19,17 @@ func (fr *filterRegexp) String() string { return fmt.Sprintf("%sre(%q)", quoteFieldNameIfNeeded(fr.fieldName), fr.re.String()) } +func (fr *filterRegexp) updateNeededFields(neededFields fieldsSet) { + neededFields.add(fr.fieldName) +} + +func (fr *filterRegexp) applyToBlockResult(br *blockResult, bm *bitmap) { + re := fr.re + applyToBlockResultGeneric(br, bm, fr.fieldName, "", func(v, _ string) bool { + return re.MatchString(v) + }) +} + func (fr *filterRegexp) apply(bs *blockSearch, bm *bitmap) { fieldName := fr.fieldName re := fr.re @@ -95,10 +106,12 @@ func matchFloat64ByRegexp(bs *blockSearch, ch *columnHeader, bm *bitmap, re *reg func matchValuesDictByRegexp(bs *blockSearch, ch *columnHeader, bm *bitmap, re *regexp.Regexp) { bb := bbPool.Get() - for i, v := range ch.valuesDict.values { + for _, v := range ch.valuesDict.values { + c := byte(0) if re.MatchString(v) { - bb.B = append(bb.B, byte(i)) + c = 1 } + bb.B = append(bb.B, c) } matchEncodedValuesDict(bs, ch, bm, bb.B) bbPool.Put(bb) diff --git a/lib/logstorage/filter_sequence.go b/lib/logstorage/filter_sequence.go index ff9dc5314..38c7516bc 100644 --- a/lib/logstorage/filter_sequence.go +++ b/lib/logstorage/filter_sequence.go @@ -31,6 +31,10 @@ func (fs *filterSequence) String() string { return fmt.Sprintf("%sseq(%s)", quoteFieldNameIfNeeded(fs.fieldName), strings.Join(a, ",")) } +func (fs *filterSequence) updateNeededFields(neededFields fieldsSet) { + neededFields.add(fs.fieldName) +} + func (fs *filterSequence) getTokens() []string { fs.tokensOnce.Do(fs.initTokens) return fs.tokens @@ -58,6 +62,17 @@ func (fs *filterSequence) initNonEmptyPhrases() { fs.nonEmptyPhrases = result } +func (fs *filterSequence) applyToBlockResult(br *blockResult, bm *bitmap) { + phrases := fs.getNonEmptyPhrases() + if len(phrases) == 0 { + return + } + + applyToBlockResultGeneric(br, bm, fs.fieldName, "", func(v, _ string) bool { + return matchSequence(v, phrases) + }) +} + func (fs *filterSequence) apply(bs *blockSearch, bm *bitmap) { fieldName := fs.fieldName phrases := fs.getNonEmptyPhrases() @@ -171,10 +186,12 @@ func matchFloat64BySequence(bs *blockSearch, ch *columnHeader, bm *bitmap, phras func matchValuesDictBySequence(bs *blockSearch, ch *columnHeader, bm *bitmap, phrases []string) { bb := bbPool.Get() - for i, v := range ch.valuesDict.values { + for _, v := range ch.valuesDict.values { + c := byte(0) if matchSequence(v, phrases) { - bb.B = append(bb.B, byte(i)) + c = 1 } + bb.B = append(bb.B, c) } matchEncodedValuesDict(bs, ch, bm, bb.B) bbPool.Put(bb) diff --git a/lib/logstorage/filter_stream.go b/lib/logstorage/filter_stream.go index 9618dc3a2..e93a23364 100644 --- a/lib/logstorage/filter_stream.go +++ b/lib/logstorage/filter_stream.go @@ -2,6 +2,8 @@ package logstorage import ( "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) // filterStream is the filter for `_stream:{...}` @@ -27,6 +29,10 @@ func (fs *filterStream) String() string { return "_stream:" + s } +func (fs *filterStream) updateNeededFields(neededFields fieldsSet) { + neededFields.add("_stream") +} + func (fs *filterStream) getStreamIDs() map[streamID]struct{} { fs.streamIDsOnce.Do(fs.initStreamIDs) return fs.streamIDs @@ -41,6 +47,65 @@ func (fs *filterStream) initStreamIDs() { fs.streamIDs = m } +func (fs *filterStream) applyToBlockResult(br *blockResult, bm *bitmap) { + if fs.f.isEmpty() { + return + } + + c := br.getColumnByName("_stream") + if c.isConst { + v := c.valuesEncoded[0] + if !fs.f.matchStreamName(v) { + bm.resetBits() + return + } + } + if c.isTime { + bm.resetBits() + return + } + + switch c.valueType { + case valueTypeString: + values := c.getValues(br) + bm.forEachSetBit(func(idx int) bool { + v := values[idx] + return fs.f.matchStreamName(v) + }) + case valueTypeDict: + bb := bbPool.Get() + for _, v := range c.dictValues { + c := byte(0) + if fs.f.matchStreamName(v) { + c = 1 + } + bb.B = append(bb.B, c) + } + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + n := valuesEncoded[idx][0] + return bb.B[n] == 1 + }) + bbPool.Put(bb) + case valueTypeUint8: + bm.resetBits() + case valueTypeUint16: + bm.resetBits() + case valueTypeUint32: + bm.resetBits() + case valueTypeUint64: + bm.resetBits() + case valueTypeFloat64: + bm.resetBits() + case valueTypeIPv4: + bm.resetBits() + case valueTypeTimestampISO8601: + bm.resetBits() + default: + logger.Panicf("FATAL: unknown valueType=%d", c.valueType) + } +} + func (fs *filterStream) apply(bs *blockSearch, bm *bitmap) { if fs.f.isEmpty() { return diff --git a/lib/logstorage/filter_string_range.go b/lib/logstorage/filter_string_range.go index c360501c0..f7f5f514d 100644 --- a/lib/logstorage/filter_string_range.go +++ b/lib/logstorage/filter_string_range.go @@ -22,6 +22,24 @@ func (fr *filterStringRange) String() string { return fmt.Sprintf("%sstring_range(%s, %s)", quoteFieldNameIfNeeded(fr.fieldName), quoteTokenIfNeeded(fr.minValue), quoteTokenIfNeeded(fr.maxValue)) } +func (fr *filterStringRange) updateNeededFields(neededFields fieldsSet) { + neededFields.add(fr.fieldName) +} + +func (fr *filterStringRange) applyToBlockResult(br *blockResult, bm *bitmap) { + minValue := fr.minValue + maxValue := fr.maxValue + + if minValue > maxValue { + bm.resetBits() + return + } + + applyToBlockResultGeneric(br, bm, fr.fieldName, "", func(v, _ string) bool { + return matchStringRange(v, minValue, maxValue) + }) +} + func (fr *filterStringRange) apply(bs *blockSearch, bm *bitmap) { fieldName := fr.fieldName minValue := fr.minValue @@ -117,10 +135,12 @@ func matchFloat64ByStringRange(bs *blockSearch, ch *columnHeader, bm *bitmap, mi func matchValuesDictByStringRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue, maxValue string) { bb := bbPool.Get() - for i, v := range ch.valuesDict.values { + for _, v := range ch.valuesDict.values { + c := byte(0) if matchStringRange(v, minValue, maxValue) { - bb.B = append(bb.B, byte(i)) + c = 1 } + bb.B = append(bb.B, c) } matchEncodedValuesDict(bs, ch, bm, bb.B) bbPool.Put(bb) diff --git a/lib/logstorage/filter_test.go b/lib/logstorage/filter_test.go index 875ecc333..e80e23ed9 100644 --- a/lib/logstorage/filter_test.go +++ b/lib/logstorage/filter_test.go @@ -197,11 +197,6 @@ func testFilterMatchForStorage(t *testing.T, s *Storage, tenantID TenantID, f fi } workersCount := 3 s.search(workersCount, so, nil, func(_ uint, br *blockResult) { - // Verify tenantID - if !br.streamID.tenantID.equal(&tenantID) { - t.Fatalf("unexpected tenantID in blockResult; got %s; want %s", &br.streamID.tenantID, &tenantID) - } - // Verify columns cs := br.getColumns() if len(cs) != 1 { diff --git a/lib/logstorage/filter_time.go b/lib/logstorage/filter_time.go index 2cffbf4c4..f4b5bb7b5 100644 --- a/lib/logstorage/filter_time.go +++ b/lib/logstorage/filter_time.go @@ -1,5 +1,9 @@ package logstorage +import ( + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" +) + // filterTime filters by time. // // It is expressed as `_time:(start, end]` in LogsQL. @@ -18,6 +22,94 @@ func (ft *filterTime) String() string { return "_time:" + ft.stringRepr } +func (ft *filterTime) updateNeededFields(neededFields fieldsSet) { + neededFields.add("_time") +} + +func (ft *filterTime) applyToBlockResult(br *blockResult, bm *bitmap) { + minTimestamp := ft.minTimestamp + maxTimestamp := ft.maxTimestamp + + if minTimestamp > maxTimestamp { + bm.resetBits() + return + } + + c := br.getColumnByName("_time") + if c.isConst { + v := c.valuesEncoded[0] + if !ft.matchTimestampString(v) { + bm.resetBits() + } + return + } + if c.isTime { + timestamps := br.timestamps + bm.forEachSetBit(func(idx int) bool { + timestamp := timestamps[idx] + return ft.matchTimestampValue(timestamp) + }) + return + } + + switch c.valueType { + case valueTypeString: + values := c.getValues(br) + bm.forEachSetBit(func(idx int) bool { + v := values[idx] + return ft.matchTimestampString(v) + }) + case valueTypeDict: + bb := bbPool.Get() + for _, v := range c.dictValues { + c := byte(0) + if ft.matchTimestampString(v) { + c = 1 + } + bb.B = append(bb.B, c) + } + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + n := valuesEncoded[idx][0] + return bb.B[n] == 1 + }) + bbPool.Put(bb) + case valueTypeUint8: + bm.resetBits() + case valueTypeUint16: + bm.resetBits() + case valueTypeUint32: + bm.resetBits() + case valueTypeUint64: + bm.resetBits() + case valueTypeFloat64: + bm.resetBits() + case valueTypeIPv4: + bm.resetBits() + case valueTypeTimestampISO8601: + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + v := valuesEncoded[idx] + timestamp := unmarshalTimestampISO8601(v) + return ft.matchTimestampValue(timestamp) + }) + default: + logger.Panicf("FATAL: unknown valueType=%d", c.valueType) + } +} + +func (ft *filterTime) matchTimestampString(v string) bool { + timestamp, ok := tryParseTimestampRFC3339Nano(v) + if !ok { + return false + } + return ft.matchTimestampValue(timestamp) +} + +func (ft *filterTime) matchTimestampValue(timestamp int64) bool { + return timestamp >= ft.minTimestamp && timestamp <= ft.maxTimestamp +} + func (ft *filterTime) apply(bs *blockSearch, bm *bitmap) { minTimestamp := ft.minTimestamp maxTimestamp := ft.maxTimestamp diff --git a/lib/logstorage/indexdb.go b/lib/logstorage/indexdb.go index dba66b3a1..fd90ac108 100644 --- a/lib/logstorage/indexdb.go +++ b/lib/logstorage/indexdb.go @@ -268,7 +268,7 @@ func (is *indexSearch) getStreamIDsForTagFilter(tenantID TenantID, tf *streamTag } return ids case "=~": - re := tf.getRegexp() + re := tf.regexp if re.MatchString("") { // (field=~"|re") => (field="" or field=~"re") ids := is.getStreamIDsForEmptyTagValue(tenantID, tf.tagName) @@ -280,7 +280,7 @@ func (is *indexSearch) getStreamIDsForTagFilter(tenantID TenantID, tf *streamTag } return is.getStreamIDsForTagRegexp(tenantID, tf.tagName, re) case "!~": - re := tf.getRegexp() + re := tf.regexp if re.MatchString("") { // (field!~"|re") => (field!="" and not field=~"re") ids := is.getStreamIDsForTagName(tenantID, tf.tagName) diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 4ecfd7006..f6c6f23a4 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -968,6 +968,13 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats by (_time:year offset 6.5h) count() foo`, `* | stats by (_time:year offset 6.5h) count(*) as foo`) f(`* | stats (_time:year offset 6.5h) count() foo`, `* | stats by (_time:year offset 6.5h) count(*) as foo`) + // stats pipe with per-func filters + f(`* | stats count() if (foo bar) rows`, `* | stats count(*) if (foo bar) as rows`) + f(`* | stats by (_time:1d offset -2h, f2) + count() if (is_admin:true or _msg:"foo bar"*) as foo, + sum(duration) if (host:in('foo.com', 'bar.com') and path:/foobar) as bar`, + `* | stats by (_time:1d offset -2h, f2) count(*) if (is_admin:true or "foo bar"*) as foo, sum(duration) if (host:in(foo.com,bar.com) path:"/foobar") as bar`) + // sort pipe f(`* | sort`, `* | sort`) f(`* | sort desc`, `* | sort desc`) diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index f1bcc522a..64aba9a05 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -80,9 +80,12 @@ func newPipeSortProcessor(ps *pipeSort, workersCount int, stopCh <-chan struct{} shards := make([]pipeSortProcessorShard, workersCount) for i := range shards { - shard := &shards[i] - shard.ps = ps - shard.stateSizeBudget = stateSizeBudgetChunk + shards[i] = pipeSortProcessorShard{ + pipeSortProcessorShardNopad: pipeSortProcessorShardNopad{ + ps: ps, + stateSizeBudget: stateSizeBudgetChunk, + }, + } maxStateSize -= stateSizeBudgetChunk } diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 885d5885e..b9a211350 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -28,6 +28,12 @@ type pipeStatsFunc struct { // f is stats function to execute f statsFunc + // neededFieldsForFunc contains needed fields for f execution + neededFieldsForFunc []string + + // iff is an additional filter, which is applied to results before executing f on them + iff filter + // resultName is the name of the output generated by f resultName string } @@ -36,12 +42,12 @@ type statsFunc interface { // String returns string representation of statsFunc String() string - // neededFields returns the needed fields for calculating the given stats - neededFields() []string + // updateNeededFields update neededFields with the fields needed for calculating the given stats + updateNeededFields(neededFields fieldsSet) - // newStatsProcessor must create new statsProcessor for calculating stats for the given statsFunc. + // newStatsProcessor must create new statsProcessor for calculating stats for the given statsFunc // - // It also must return the size in bytes of the returned statsProcessor. + // It also must return the size in bytes of the returned statsProcessor newStatsProcessor() (statsProcessor, int) } @@ -82,7 +88,12 @@ func (ps *pipeStats) String() string { } a := make([]string, len(ps.funcs)) for i, f := range ps.funcs { - a[i] = f.f.String() + " as " + quoteTokenIfNeeded(f.resultName) + line := f.f.String() + if f.iff != nil { + line += " if (" + f.iff.String() + ")" + } + line += " as " + quoteTokenIfNeeded(f.resultName) + a[i] = line } s += strings.Join(a, ", ") return s @@ -97,10 +108,12 @@ func (ps *pipeStats) updateNeededFields(neededFields, unneededFields fieldsSet) neededFields.add(bf.name) } - for i, f := range ps.funcs { + for _, f := range ps.funcs { if neededFieldsOrig.contains(f.resultName) && !unneededFields.contains(f.resultName) { - funcFields := ps.funcs[i].f.neededFields() - neededFields.addAll(funcFields) + f.f.updateNeededFields(neededFields) + if f.iff != nil { + f.iff.updateNeededFields(neededFields) + } } } @@ -113,11 +126,21 @@ func (ps *pipeStats) newPipeProcessor(workersCount int, stopCh <-chan struct{}, maxStateSize := int64(float64(memory.Allowed()) * 0.3) shards := make([]pipeStatsProcessorShard, workersCount) + funcsLen := len(ps.funcs) for i := range shards { - shard := &shards[i] - shard.ps = ps - shard.m = make(map[string]*pipeStatsGroup) - shard.stateSizeBudget = stateSizeBudgetChunk + shards[i] = pipeStatsProcessorShard{ + pipeStatsProcessorShardNopad: pipeStatsProcessorShardNopad{ + ps: ps, + + m: make(map[string]*pipeStatsGroup), + + bms: make([]bitmap, funcsLen), + brs: make([]*blockResult, funcsLen), + brsBuf: make([]blockResult, funcsLen), + + stateSizeBudget: stateSizeBudgetChunk, + }, + } maxStateSize -= stateSizeBudgetChunk } @@ -157,7 +180,13 @@ type pipeStatsProcessorShard struct { type pipeStatsProcessorShardNopad struct { ps *pipeStats - m map[string]*pipeStatsGroup + + m map[string]*pipeStatsGroup + + // bms, brs and brsBuf are used for applying per-func filters. + bms []bitmap + brs []*blockResult + brsBuf []blockResult columnValues [][]string keyBuf []byte @@ -168,10 +197,14 @@ type pipeStatsProcessorShardNopad struct { func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { byFields := shard.ps.byFields + // Apply per-function filters + brs := shard.applyPerFunctionFilters(br) + + // Process stats for the defined functions if len(byFields) == 0 { // Fast path - pass all the rows to a single group with empty key. psg := shard.getPipeStatsGroup(nil) - shard.stateSizeBudget -= psg.updateStatsForAllRows(br) + shard.stateSizeBudget -= psg.updateStatsForAllRows(brs) return } if len(byFields) == 1 { @@ -183,7 +216,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { v := br.getBucketedValue(c.valuesEncoded[0], bf) shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(v)) psg := shard.getPipeStatsGroup(shard.keyBuf) - shard.stateSizeBudget -= psg.updateStatsForAllRows(br) + shard.stateSizeBudget -= psg.updateStatsForAllRows(brs) return } @@ -192,7 +225,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { // Fast path for column with constant values. shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0])) psg := shard.getPipeStatsGroup(shard.keyBuf) - shard.stateSizeBudget -= psg.updateStatsForAllRows(br) + shard.stateSizeBudget -= psg.updateStatsForAllRows(brs) return } @@ -204,7 +237,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { keyBuf = encoding.MarshalBytes(keyBuf[:0], bytesutil.ToUnsafeBytes(values[i])) psg = shard.getPipeStatsGroup(keyBuf) } - shard.stateSizeBudget -= psg.updateStatsForRow(br, i) + shard.stateSizeBudget -= psg.updateStatsForRow(brs, i) } shard.keyBuf = keyBuf return @@ -234,7 +267,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[0])) } psg := shard.getPipeStatsGroup(keyBuf) - shard.stateSizeBudget -= psg.updateStatsForAllRows(br) + shard.stateSizeBudget -= psg.updateStatsForAllRows(brs) shard.keyBuf = keyBuf return } @@ -259,11 +292,40 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { } psg = shard.getPipeStatsGroup(keyBuf) } - shard.stateSizeBudget -= psg.updateStatsForRow(br, i) + shard.stateSizeBudget -= psg.updateStatsForRow(brs, i) } shard.keyBuf = keyBuf } +func (shard *pipeStatsProcessorShard) applyPerFunctionFilters(brSrc *blockResult) []*blockResult { + funcs := shard.ps.funcs + brs := shard.brs + for i := range funcs { + iff := funcs[i].iff + if iff == nil { + // Fast path - there are no per-function filters + brs[i] = brSrc + continue + } + + bm := &shard.bms[i] + bm.init(len(brSrc.timestamps)) + bm.setBits() + iff.applyToBlockResult(brSrc, bm) + if bm.areAllBitsSet() { + // Fast path - per-function filter doesn't filter out rows + brs[i] = brSrc + continue + } + + // Store the remaining rows for the needed per-func fields to brDst + brDst := &shard.brsBuf[i] + brDst.initFromNeededColumns(brSrc, bm, funcs[i].neededFieldsForFunc) + brs[i] = brDst + } + return brs +} + func (shard *pipeStatsProcessorShard) getPipeStatsGroup(key []byte) *pipeStatsGroup { psg := shard.m[string(key)] if psg != nil { @@ -289,18 +351,18 @@ type pipeStatsGroup struct { sfps []statsProcessor } -func (psg *pipeStatsGroup) updateStatsForAllRows(br *blockResult) int { +func (psg *pipeStatsGroup) updateStatsForAllRows(brs []*blockResult) int { n := 0 - for _, sfp := range psg.sfps { - n += sfp.updateStatsForAllRows(br) + for i, sfp := range psg.sfps { + n += sfp.updateStatsForAllRows(brs[i]) } return n } -func (psg *pipeStatsGroup) updateStatsForRow(br *blockResult, rowIdx int) int { +func (psg *pipeStatsGroup) updateStatsForRow(brs []*blockResult, rowIdx int) int { n := 0 - for _, sfp := range psg.sfps { - n += sfp.updateStatsForRow(br, rowIdx) + for i, sfp := range psg.sfps { + n += sfp.updateStatsForRow(brs[i], rowIdx) } return n } @@ -454,27 +516,32 @@ func parsePipeStats(lex *lexer) (*pipeStats, error) { var funcs []pipeStatsFunc for { + var f pipeStatsFunc sf, err := parseStatsFunc(lex) if err != nil { return nil, err } - /* - if lex.isKeyword("if") { - ifQuery, err := parseIfQuery(lex) - if err != nil { - return fmt.Errorf("cannot parse 'if' query for %s: %w", sf, err) - } + f.f = sf + + neededFields := newFieldsSet() + f.f.updateNeededFields(neededFields) + f.neededFieldsForFunc = neededFields.getAll() + + if lex.isKeyword("if") { + iff, err := parseIfFilter(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'if' filter for %s: %w", sf, err) } - */ + f.iff = iff + } + resultName, err := parseResultName(lex) if err != nil { return nil, fmt.Errorf("cannot parse result name for %s: %w", sf, err) } + f.resultName = resultName - funcs = append(funcs, pipeStatsFunc{ - f: sf, - resultName: resultName, - }) + funcs = append(funcs, f) if lex.isKeyword("|", ")", "") { ps.funcs = funcs @@ -487,6 +554,26 @@ func parsePipeStats(lex *lexer) (*pipeStats, error) { } } +func parseIfFilter(lex *lexer) (filter, error) { + if !lex.isKeyword("if") { + return nil, fmt.Errorf("unexpected keyword %q; expecting 'if'", lex.token) + } + lex.nextToken() + if !lex.isKeyword("(") { + return nil, fmt.Errorf("unexpected token %q after 'if'; expecting '('", lex.token) + } + lex.nextToken() + f, err := parseFilter(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'if' filter: %w", err) + } + if !lex.isKeyword(")") { + return nil, fmt.Errorf("unexpected token %q after 'if' filter; expecting ')'", lex.token) + } + lex.nextToken() + return f, nil +} + func parseStatsFunc(lex *lexer) (statsFunc, error) { switch { case lex.isKeyword("count"): diff --git a/lib/logstorage/pipe_topk.go b/lib/logstorage/pipe_topk.go index 5a6a7f8db..20e2d6eb0 100644 --- a/lib/logstorage/pipe_topk.go +++ b/lib/logstorage/pipe_topk.go @@ -18,9 +18,12 @@ func newPipeTopkProcessor(ps *pipeSort, workersCount int, stopCh <-chan struct{} shards := make([]pipeTopkProcessorShard, workersCount) for i := range shards { - shard := &shards[i] - shard.ps = ps - shard.stateSizeBudget = stateSizeBudgetChunk + shards[i] = pipeTopkProcessorShard{ + pipeTopkProcessorShardNopad: pipeTopkProcessorShardNopad{ + ps: ps, + stateSizeBudget: stateSizeBudgetChunk, + }, + } maxStateSize -= stateSizeBudgetChunk } diff --git a/lib/logstorage/pipe_uniq.go b/lib/logstorage/pipe_uniq.go index 3b2fcc66d..757b057a4 100644 --- a/lib/logstorage/pipe_uniq.go +++ b/lib/logstorage/pipe_uniq.go @@ -49,10 +49,13 @@ func (pu *pipeUniq) newPipeProcessor(workersCount int, stopCh <-chan struct{}, c shards := make([]pipeUniqProcessorShard, workersCount) for i := range shards { - shard := &shards[i] - shard.pu = pu - shard.m = make(map[string]struct{}) - shard.stateSizeBudget = stateSizeBudgetChunk + shards[i] = pipeUniqProcessorShard{ + pipeUniqProcessorShardNopad: pipeUniqProcessorShardNopad{ + pu: pu, + m: make(map[string]struct{}), + stateSizeBudget: stateSizeBudgetChunk, + }, + } maxStateSize -= stateSizeBudgetChunk } diff --git a/lib/logstorage/stats_avg.go b/lib/logstorage/stats_avg.go index f0afcfee8..5977a31e3 100644 --- a/lib/logstorage/stats_avg.go +++ b/lib/logstorage/stats_avg.go @@ -15,8 +15,8 @@ func (sa *statsAvg) String() string { return "avg(" + fieldNamesString(sa.fields) + ")" } -func (sa *statsAvg) neededFields() []string { - return sa.fields +func (sa *statsAvg) updateNeededFields(neededFields fieldsSet) { + neededFields.addAll(sa.fields) } func (sa *statsAvg) newStatsProcessor() (statsProcessor, int) { diff --git a/lib/logstorage/stats_count.go b/lib/logstorage/stats_count.go index dcf2be2a7..158ea0f20 100644 --- a/lib/logstorage/stats_count.go +++ b/lib/logstorage/stats_count.go @@ -17,12 +17,12 @@ func (sc *statsCount) String() string { return "count(" + fieldNamesString(sc.fields) + ")" } -func (sc *statsCount) neededFields() []string { +func (sc *statsCount) updateNeededFields(neededFields fieldsSet) { if sc.containsStar { // There is no need in fetching any columns for count(*) - the number of matching rows can be calculated as len(blockResult.timestamps) - return nil + return } - return sc.fields + neededFields.addAll(sc.fields) } func (sc *statsCount) newStatsProcessor() (statsProcessor, int) { diff --git a/lib/logstorage/stats_count_empty.go b/lib/logstorage/stats_count_empty.go index 9904057f5..c18756a5d 100644 --- a/lib/logstorage/stats_count_empty.go +++ b/lib/logstorage/stats_count_empty.go @@ -17,8 +17,8 @@ func (sc *statsCountEmpty) String() string { return "count_empty(" + fieldNamesString(sc.fields) + ")" } -func (sc *statsCountEmpty) neededFields() []string { - return sc.fields +func (sc *statsCountEmpty) updateNeededFields(neededFields fieldsSet) { + neededFields.addAll(sc.fields) } func (sc *statsCountEmpty) newStatsProcessor() (statsProcessor, int) { diff --git a/lib/logstorage/stats_count_uniq.go b/lib/logstorage/stats_count_uniq.go index d4f1bc486..319aad7fc 100644 --- a/lib/logstorage/stats_count_uniq.go +++ b/lib/logstorage/stats_count_uniq.go @@ -24,8 +24,8 @@ func (su *statsCountUniq) String() string { return s } -func (su *statsCountUniq) neededFields() []string { - return su.fields +func (su *statsCountUniq) updateNeededFields(neededFields fieldsSet) { + neededFields.addAll(su.fields) } func (su *statsCountUniq) newStatsProcessor() (statsProcessor, int) { diff --git a/lib/logstorage/stats_max.go b/lib/logstorage/stats_max.go index 45289b895..8d1cea099 100644 --- a/lib/logstorage/stats_max.go +++ b/lib/logstorage/stats_max.go @@ -19,8 +19,8 @@ func (sm *statsMax) String() string { return "max(" + fieldNamesString(sm.fields) + ")" } -func (sm *statsMax) neededFields() []string { - return sm.fields +func (sm *statsMax) updateNeededFields(neededFields fieldsSet) { + neededFields.addAll(sm.fields) } func (sm *statsMax) newStatsProcessor() (statsProcessor, int) { @@ -124,23 +124,23 @@ func (smp *statsMaxProcessor) updateStateForColumn(br *blockResult, c *blockResu } case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64: bb := bbPool.Get() - bb.B = marshalUint64String(bb.B[:0], c.ch.maxValue) + bb.B = marshalUint64String(bb.B[:0], c.maxValue) smp.updateStateBytes(bb.B) bbPool.Put(bb) case valueTypeFloat64: - f := math.Float64frombits(c.ch.maxValue) + f := math.Float64frombits(c.maxValue) bb := bbPool.Get() bb.B = marshalFloat64String(bb.B[:0], f) smp.updateStateBytes(bb.B) bbPool.Put(bb) case valueTypeIPv4: bb := bbPool.Get() - bb.B = marshalIPv4String(bb.B[:0], uint32(c.ch.maxValue)) + bb.B = marshalIPv4String(bb.B[:0], uint32(c.maxValue)) smp.updateStateBytes(bb.B) bbPool.Put(bb) case valueTypeTimestampISO8601: bb := bbPool.Get() - bb.B = marshalTimestampISO8601String(bb.B[:0], int64(c.ch.maxValue)) + bb.B = marshalTimestampISO8601String(bb.B[:0], int64(c.maxValue)) smp.updateStateBytes(bb.B) bbPool.Put(bb) default: diff --git a/lib/logstorage/stats_median.go b/lib/logstorage/stats_median.go index 30cc3f1b1..ac60844a3 100644 --- a/lib/logstorage/stats_median.go +++ b/lib/logstorage/stats_median.go @@ -14,8 +14,8 @@ func (sm *statsMedian) String() string { return "median(" + fieldNamesString(sm.fields) + ")" } -func (sm *statsMedian) neededFields() []string { - return sm.fields +func (sm *statsMedian) updateNeededFields(neededFields fieldsSet) { + neededFields.addAll(sm.fields) } func (sm *statsMedian) newStatsProcessor() (statsProcessor, int) { diff --git a/lib/logstorage/stats_min.go b/lib/logstorage/stats_min.go index 8344087f6..bd1baecc4 100644 --- a/lib/logstorage/stats_min.go +++ b/lib/logstorage/stats_min.go @@ -19,8 +19,8 @@ func (sm *statsMin) String() string { return "min(" + fieldNamesString(sm.fields) + ")" } -func (sm *statsMin) neededFields() []string { - return sm.fields +func (sm *statsMin) updateNeededFields(neededFields fieldsSet) { + neededFields.addAll(sm.fields) } func (sm *statsMin) newStatsProcessor() (statsProcessor, int) { @@ -124,23 +124,23 @@ func (smp *statsMinProcessor) updateStateForColumn(br *blockResult, c *blockResu } case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64: bb := bbPool.Get() - bb.B = marshalUint64String(bb.B[:0], c.ch.minValue) + bb.B = marshalUint64String(bb.B[:0], c.minValue) smp.updateStateBytes(bb.B) bbPool.Put(bb) case valueTypeFloat64: - f := math.Float64frombits(c.ch.minValue) + f := math.Float64frombits(c.minValue) bb := bbPool.Get() bb.B = marshalFloat64String(bb.B[:0], f) smp.updateStateBytes(bb.B) bbPool.Put(bb) case valueTypeIPv4: bb := bbPool.Get() - bb.B = marshalIPv4String(bb.B[:0], uint32(c.ch.minValue)) + bb.B = marshalIPv4String(bb.B[:0], uint32(c.minValue)) smp.updateStateBytes(bb.B) bbPool.Put(bb) case valueTypeTimestampISO8601: bb := bbPool.Get() - bb.B = marshalTimestampISO8601String(bb.B[:0], int64(c.ch.minValue)) + bb.B = marshalTimestampISO8601String(bb.B[:0], int64(c.minValue)) smp.updateStateBytes(bb.B) bbPool.Put(bb) default: diff --git a/lib/logstorage/stats_quantile.go b/lib/logstorage/stats_quantile.go index 310020082..74b6e0de4 100644 --- a/lib/logstorage/stats_quantile.go +++ b/lib/logstorage/stats_quantile.go @@ -24,8 +24,8 @@ func (sq *statsQuantile) String() string { return fmt.Sprintf("quantile(%g, %s)", sq.phi, fieldNamesString(sq.fields)) } -func (sq *statsQuantile) neededFields() []string { - return sq.fields +func (sq *statsQuantile) updateNeededFields(neededFields fieldsSet) { + neededFields.addAll(sq.fields) } func (sq *statsQuantile) newStatsProcessor() (statsProcessor, int) { diff --git a/lib/logstorage/stats_sum.go b/lib/logstorage/stats_sum.go index e70eb05dd..23b0d5136 100644 --- a/lib/logstorage/stats_sum.go +++ b/lib/logstorage/stats_sum.go @@ -16,8 +16,8 @@ func (ss *statsSum) String() string { return "sum(" + fieldNamesString(ss.fields) + ")" } -func (ss *statsSum) neededFields() []string { - return ss.fields +func (ss *statsSum) updateNeededFields(neededFields fieldsSet) { + neededFields.addAll(ss.fields) } func (ss *statsSum) newStatsProcessor() (statsProcessor, int) { diff --git a/lib/logstorage/stats_sum_len.go b/lib/logstorage/stats_sum_len.go index f09e48418..644a421b5 100644 --- a/lib/logstorage/stats_sum_len.go +++ b/lib/logstorage/stats_sum_len.go @@ -15,8 +15,8 @@ func (ss *statsSumLen) String() string { return "sum_len(" + fieldNamesString(ss.fields) + ")" } -func (ss *statsSumLen) neededFields() []string { - return ss.fields +func (ss *statsSumLen) updateNeededFields(neededFields fieldsSet) { + neededFields.addAll(ss.fields) } func (ss *statsSumLen) newStatsProcessor() (statsProcessor, int) { diff --git a/lib/logstorage/stats_uniq_values.go b/lib/logstorage/stats_uniq_values.go index 6118f844a..9667d5883 100644 --- a/lib/logstorage/stats_uniq_values.go +++ b/lib/logstorage/stats_uniq_values.go @@ -24,8 +24,8 @@ func (su *statsUniqValues) String() string { return s } -func (su *statsUniqValues) neededFields() []string { - return su.fields +func (su *statsUniqValues) updateNeededFields(neededFields fieldsSet) { + neededFields.addAll(su.fields) } func (su *statsUniqValues) newStatsProcessor() (statsProcessor, int) { diff --git a/lib/logstorage/stats_values.go b/lib/logstorage/stats_values.go index 816727ba1..b7183d45b 100644 --- a/lib/logstorage/stats_values.go +++ b/lib/logstorage/stats_values.go @@ -21,8 +21,8 @@ func (sv *statsValues) String() string { return s } -func (sv *statsValues) neededFields() []string { - return sv.fields +func (sv *statsValues) updateNeededFields(neededFields fieldsSet) { + neededFields.addAll(sv.fields) } func (sv *statsValues) newStatsProcessor() (statsProcessor, int) { diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index 4eab5127d..e83aad7d8 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -470,9 +470,6 @@ func TestStorageSearch(t *testing.T) { } var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { - if !br.streamID.tenantID.equal(&tenantID) { - panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID)) - } rowsCountTotal.Add(uint32(len(br.timestamps))) } s.search(workersCount, so, nil, processBlock) @@ -535,9 +532,6 @@ func TestStorageSearch(t *testing.T) { } var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { - if !br.streamID.tenantID.equal(&tenantID) { - panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID)) - } rowsCountTotal.Add(uint32(len(br.timestamps))) } s.search(workersCount, so, nil, processBlock) @@ -564,9 +558,6 @@ func TestStorageSearch(t *testing.T) { } var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { - if !br.streamID.tenantID.equal(&tenantID) { - panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID)) - } rowsCountTotal.Add(uint32(len(br.timestamps))) } s.search(workersCount, so, nil, processBlock) @@ -601,9 +592,6 @@ func TestStorageSearch(t *testing.T) { } var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { - if !br.streamID.tenantID.equal(&tenantID) { - panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID)) - } rowsCountTotal.Add(uint32(len(br.timestamps))) } s.search(workersCount, so, nil, processBlock) diff --git a/lib/logstorage/stream_filter.go b/lib/logstorage/stream_filter.go index 0cce17229..4e3a4d2f1 100644 --- a/lib/logstorage/stream_filter.go +++ b/lib/logstorage/stream_filter.go @@ -3,6 +3,7 @@ package logstorage import ( "strconv" "strings" + "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" @@ -14,6 +15,29 @@ type StreamFilter struct { orFilters []*andStreamFilter } +func (sf *StreamFilter) matchStreamName(s string) bool { + sn := getStreamName() + defer putStreamName(sn) + + if !sn.parse(s) { + return false + } + + for _, of := range sf.orFilters { + matchAndFilters := true + for _, tf := range of.tagFilters { + if !sn.match(tf) { + matchAndFilters = false + break + } + } + if matchAndFilters { + return true + } + } + return false +} + func (sf *StreamFilter) isEmpty() bool { for _, af := range sf.orFilters { if len(af.tagFilters) > 0 { @@ -69,10 +93,96 @@ type streamTagFilter struct { regexp *regexutil.PromRegex } -func (tf *streamTagFilter) getRegexp() *regexutil.PromRegex { - return tf.regexp -} - func (tf *streamTagFilter) String() string { return quoteTokenIfNeeded(tf.tagName) + tf.op + strconv.Quote(tf.value) } + +func getStreamName() *streamName { + v := streamNamePool.Get() + if v == nil { + return &streamName{} + } + return v.(*streamName) +} + +func putStreamName(sn *streamName) { + sn.reset() + streamNamePool.Put(sn) +} + +var streamNamePool sync.Pool + +type streamName struct { + tags []Field +} + +func (sn *streamName) reset() { + clear(sn.tags) + sn.tags = sn.tags[:0] +} + +func (sn *streamName) parse(s string) bool { + if len(s) < 2 || s[0] != '{' || s[len(s)-1] != '}' { + return false + } + s = s[1 : len(s)-1] + if len(s) == 0 { + return true + } + + for { + // Parse tag name + n := strings.IndexByte(s, '=') + if n < 0 { + // cannot find tag name + return false + } + name := s[:n] + s = s[n+1:] + + // Parse tag value + if len(s) == 0 || s[0] != '"' { + return false + } + qPrefix, err := strconv.QuotedPrefix(s) + if err != nil { + return false + } + s = s[len(qPrefix):] + value, err := strconv.Unquote(qPrefix) + if err != nil { + return false + } + + sn.tags = append(sn.tags, Field{ + Name: name, + Value: value, + }) + + if len(s) == 0 { + return true + } + if s[0] != ',' { + return false + } + } +} + +func (sn *streamName) match(tf *streamTagFilter) bool { + for _, t := range sn.tags { + if t.Name != tf.tagName { + continue + } + switch tf.op { + case "=": + return t.Value == tf.value + case "!=": + return t.Value != tf.value + case "=~": + return tf.regexp.MatchString(t.Value) + case "!~": + return !tf.regexp.MatchString(t.Value) + } + } + return false +} diff --git a/lib/storage/storage.go b/lib/storage/storage.go index bc3d4a288..493d7d42a 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -2004,7 +2004,7 @@ func createAllIndexesForMetricName(is *indexSearch, mn *MetricName, tsid *TSID, } func (s *Storage) putSeriesToCache(metricNameRaw []byte, genTSID *generationTSID, date uint64) { - // Store the TSID for for the current indexdb into cache, + // Store the TSID for the current indexdb into cache, // so future rows for that TSID are ingested via fast path. s.putTSIDToCache(genTSID, metricNameRaw)