This commit is contained in:
Aliaksandr Valialkin 2024-05-17 04:11:10 +02:00
parent 7e4769abad
commit 53a378faab
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
47 changed files with 1416 additions and 219 deletions

View file

@ -50,6 +50,8 @@ func (bm *bitmap) copyFrom(src *bitmap) {
}
func (bm *bitmap) init(bitsLen int) {
bm.reset()
a := bm.a
wordsLen := (bitsLen + 63) / 64
a = slicesutil.SetLength(a, wordsLen)

View file

@ -19,21 +19,12 @@ import (
//
// It is expected that its contents is accessed only from a single goroutine at a time.
type blockResult struct {
// bs is the blockSearch used for fetching block data.
bs *blockSearch
// bm is bitamp for fetching the needed rows from bs.
bm *bitmap
// a holds all the bytes behind the requested column values in the block.
a arena
// values holds all the requested column values in the block.
valuesBuf []string
// streamID is streamID for the given blockResult.
streamID streamID
// timestamps contain timestamps for the selected log entries in the block.
timestamps []int64
@ -54,16 +45,11 @@ type blockResult struct {
}
func (br *blockResult) reset() {
br.bs = nil
br.bm = nil
br.a.reset()
clear(br.valuesBuf)
br.valuesBuf = br.valuesBuf[:0]
br.streamID.reset()
br.timestamps = br.timestamps[:0]
br.csBufOffset = 0
@ -81,9 +67,6 @@ func (br *blockResult) reset() {
func (br *blockResult) clone() *blockResult {
brNew := &blockResult{}
brNew.bs = br.bs
brNew.bm = br.bm
cs := br.getColumns()
bufLen := 0
@ -98,8 +81,6 @@ func (br *blockResult) clone() *blockResult {
}
brNew.valuesBuf = make([]string, 0, valuesBufLen)
brNew.streamID = br.streamID
brNew.timestamps = make([]int64, len(br.timestamps))
copy(brNew.timestamps, br.timestamps)
@ -112,6 +93,55 @@ func (br *blockResult) clone() *blockResult {
return brNew
}
// initFromNeededColumns initializes br from brSrc, by copying only the given neededColumns for rows identified by set bits at bm.
//
// The br valid until brSrc is reset or bm is updated.
func (br *blockResult) initFromNeededColumns(brSrc *blockResult, bm *bitmap, neededColumns []string) {
br.reset()
srcTimestamps := brSrc.timestamps
dstTimestamps := br.timestamps[:0]
bm.forEachSetBitReadonly(func(idx int) {
dstTimestamps = append(dstTimestamps, srcTimestamps[idx])
})
br.timestamps = dstTimestamps
for _, neededColumn := range neededColumns {
cSrc := brSrc.getColumnByName(neededColumn)
cDst := blockResultColumn{
name: cSrc.name,
}
if cSrc.isConst {
cDst.isConst = true
cDst.valuesEncoded = cSrc.valuesEncoded
} else if cSrc.isTime {
cDst.isTime = true
} else {
cDst.valueType = cSrc.valueType
cDst.minValue = cSrc.minValue
cDst.maxValue = cSrc.maxValue
cDst.dictValues = cSrc.dictValues
cDst.newValuesEncodedFunc = func(br *blockResult) []string {
valuesEncodedSrc := cSrc.getValuesEncoded(brSrc)
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
bm.forEachSetBitReadonly(func(idx int) {
valuesBuf = append(valuesBuf, valuesEncodedSrc[idx])
})
br.valuesBuf = valuesBuf
return valuesBuf[valuesBufLen:]
}
}
br.csBuf = append(br.csBuf, cDst)
br.csInitialized = false
}
}
// cloneValues clones the given values into br and returns the cloned values.
func (br *blockResult) cloneValues(values []string) []string {
valuesBufLen := len(br.valuesBuf)
@ -176,9 +206,10 @@ func (br *blockResult) setResultColumns(rcs []resultColumn) {
br.csInitialized = false
}
func (br *blockResult) initAllColumns() {
bs := br.bs
// initAllColumns initializes all the columns in br according to bs and bm.
//
// The initialized columns are valid until bs and bm are changed.
func (br *blockResult) initAllColumns(bs *blockSearch, bm *bitmap) {
unneededColumnNames := bs.bsw.so.unneededColumnNames
if !slices.Contains(unneededColumnNames, "_time") {
@ -201,7 +232,7 @@ func (br *blockResult) initAllColumns() {
if v != "" {
br.addConstColumn("_msg", v)
} else if ch := bs.csh.getColumnHeader("_msg"); ch != nil {
br.addColumn(ch)
br.addColumn(bs, bm, ch)
} else {
br.addConstColumn("_msg", "")
}
@ -225,14 +256,15 @@ func (br *blockResult) initAllColumns() {
continue
}
if !slices.Contains(unneededColumnNames, ch.name) {
br.addColumn(ch)
br.addColumn(bs, bm, ch)
}
}
}
func (br *blockResult) initRequestedColumns() {
bs := br.bs
// initRequestedColumns initialized only requested columns in br according to bs and bm.
//
// The initialized columns are valid until bs and bm are changed.
func (br *blockResult) initRequestedColumns(bs *blockSearch, bm *bitmap) {
for _, columnName := range bs.bsw.so.neededColumnNames {
switch columnName {
case "_stream":
@ -248,7 +280,7 @@ func (br *blockResult) initRequestedColumns() {
if v != "" {
br.addConstColumn(columnName, v)
} else if ch := bs.csh.getColumnHeader(columnName); ch != nil {
br.addColumn(ch)
br.addColumn(bs, bm, ch)
} else {
br.addConstColumn(columnName, "")
}
@ -259,10 +291,6 @@ func (br *blockResult) initRequestedColumns() {
func (br *blockResult) mustInit(bs *blockSearch, bm *bitmap) {
br.reset()
br.bs = bs
br.bm = bm
br.streamID = bs.bsw.bh.streamID
if bm.isZero() {
// Nothing to initialize for zero matching log entries in the block.
return
@ -294,21 +322,10 @@ func (br *blockResult) mustInit(bs *blockSearch, bm *bitmap) {
br.timestamps = dstTimestamps
}
func (br *blockResult) newValuesEncodedForColumn(c *blockResultColumn) []string {
if c.isConst {
logger.Panicf("BUG: newValuesEncodedForColumn() musn't be called for const column")
}
if c.isTime {
logger.Panicf("BUG: newValuesEncodedForColumn() musn't be called for time column")
}
func (br *blockResult) newValuesEncodedFromColumnHeader(bs *blockSearch, bm *bitmap, ch *columnHeader) []string {
valuesBufLen := len(br.valuesBuf)
bs := br.bs
bm := br.bm
ch := &c.ch
switch c.valueType {
switch ch.valueType {
case valueTypeString:
visitValuesReadonly(bs, ch, bm, br.addValue)
case valueTypeDict:
@ -317,8 +334,8 @@ func (br *blockResult) newValuesEncodedForColumn(c *blockResultColumn) []string
logger.Panicf("FATAL: %s: unexpected dict value size for column %q; got %d bytes; want 1 byte", bs.partPath(), ch.name, len(v))
}
dictIdx := v[0]
if int(dictIdx) >= len(c.dictValues) {
logger.Panicf("FATAL: %s: too big dict index for column %q: %d; should be smaller than %d", bs.partPath(), ch.name, dictIdx, len(c.dictValues))
if int(dictIdx) >= len(ch.valuesDict.values) {
logger.Panicf("FATAL: %s: too big dict index for column %q: %d; should be smaller than %d", bs.partPath(), ch.name, dictIdx, len(ch.valuesDict.values))
}
br.addValue(v)
})
@ -380,18 +397,19 @@ func (br *blockResult) newValuesEncodedForColumn(c *blockResultColumn) []string
// addColumn adds column for the given ch to br.
//
// The added column is valid until ch is changed.
func (br *blockResult) addColumn(ch *columnHeader) {
// The added column is valid until bs, bm or ch is changed.
func (br *blockResult) addColumn(bs *blockSearch, bm *bitmap, ch *columnHeader) {
br.csBuf = append(br.csBuf, blockResultColumn{
name: getCanonicalColumnName(ch.name),
ch: *ch,
valueType: ch.valueType,
minValue: ch.minValue,
maxValue: ch.maxValue,
dictValues: ch.valuesDict.values,
newValuesEncodedFunc: func(br *blockResult) []string {
return br.newValuesEncodedFromColumnHeader(bs, bm, ch)
},
})
br.csInitialized = false
c := &br.csBuf[len(br.csBuf)-1]
c.ch.valuesDict.values = nil
}
func (br *blockResult) addTimeColumn() {
@ -406,7 +424,8 @@ func (br *blockResult) addStreamColumn(bs *blockSearch) bool {
bb := bbPool.Get()
defer bbPool.Put(bb)
bb.B = bs.bsw.p.pt.appendStreamTagsByStreamID(bb.B[:0], &br.streamID)
streamID := &bs.bsw.bh.streamID
bb.B = bs.bsw.p.pt.appendStreamTagsByStreamID(bb.B[:0], streamID)
if len(bb.B) == 0 {
// Couldn't find stream tags by streamID. This may be the case when the corresponding log stream
// was recently registered and its tags aren't visible to search yet.
@ -1340,14 +1359,9 @@ func (br *blockResult) truncateRows(keepRows int) {
// blockResultColumn doesn't own any referred data - all the referred data must be owned by blockResult.
// This simplifies copying, resetting and re-using of the struct.
type blockResultColumn struct {
// name is column name.
// name is column name
name string
// ch is is used for initializing valuesEncoded for non-time and non-const columns.
//
// ch.valuesDict.values must be set to nil, since dict values for valueTypeDict are stored at dictValues.
ch columnHeader
// isConst is set to true if the column is const.
//
// The column value is stored in valuesEncoded[0]
@ -1361,6 +1375,16 @@ type blockResultColumn struct {
// valueType is the type of non-cost value
valueType valueType
// minValue is the minimum encoded value for uint*, ipv4, timestamp and float64 value
//
// It is used for fast detection of whether the given column contains values in the given range
minValue uint64
// maxValue is the maximum encoded value for uint*, ipv4, timestamp and float64 value
//
// It is used for fast detection of whether the given column contains values in the given range
maxValue uint64
// dictValues contains dict values for valueType=valueTypeDict.
dictValues []string
@ -1373,6 +1397,11 @@ type blockResultColumn struct {
// valuesBucketed contains values after getValuesBucketed() call
valuesBucketed []string
// newValuesEncodedFunc must return valuesEncoded.
//
// This func must be set for non-const and non-time columns if valuesEncoded field isn't set.
newValuesEncodedFunc func(br *blockResult) []string
// bucketSizeStr contains bucketSizeStr for valuesBucketed
bucketSizeStr string
@ -1388,12 +1417,11 @@ func (c *blockResultColumn) clone(br *blockResult) blockResultColumn {
cNew.name = br.a.copyString(c.name)
cNew.ch = c.ch
cNew.ch.valuesDict.values = nil
cNew.isConst = c.isConst
cNew.isTime = c.isTime
cNew.valueType = c.valueType
cNew.minValue = c.minValue
cNew.maxValue = c.maxValue
cNew.dictValues = br.cloneValues(c.dictValues)
cNew.valuesEncoded = br.cloneValues(c.valuesEncoded)
@ -1402,6 +1430,8 @@ func (c *blockResultColumn) clone(br *blockResult) blockResultColumn {
}
cNew.valuesBucketed = br.cloneValues(c.valuesBucketed)
cNew.newValuesEncodedFunc = c.newValuesEncodedFunc
cNew.bucketSizeStr = c.bucketSizeStr
cNew.bucketOffsetStr = c.bucketOffsetStr
@ -1492,11 +1522,12 @@ func (c *blockResultColumn) getValuesEncoded(br *blockResult) []string {
if c.isTime {
return nil
}
if values := c.valuesEncoded; values != nil {
return values
}
c.valuesEncoded = br.newValuesEncodedForColumn(c)
c.valuesEncoded = c.newValuesEncodedFunc(br)
return c.valuesEncoded
}

View file

@ -167,9 +167,9 @@ func (bs *blockSearch) search(bsw *blockSearchWork, bm *bitmap) {
// fetch the requested columns to bs.br.
if bs.bsw.so.needAllColumns {
bs.br.initAllColumns()
bs.br.initAllColumns(bs, bm)
} else {
bs.br.initRequestedColumns()
bs.br.initRequestedColumns(bs, bm)
}
}

View file

@ -37,12 +37,10 @@ func (fs fieldsSet) getAll() []string {
return a
}
func (fs fieldsSet) contains(field string) bool {
_, ok := fs[field]
if !ok {
_, ok = fs["*"]
func (fs fieldsSet) addAll(fields []string) {
for _, f := range fields {
fs.add(f)
}
return ok
}
func (fs fieldsSet) removeAll(fields []string) {
@ -51,22 +49,30 @@ func (fs fieldsSet) removeAll(fields []string) {
}
}
func (fs fieldsSet) contains(field string) bool {
if field == "" {
field = "_msg"
}
_, ok := fs[field]
if !ok {
_, ok = fs["*"]
}
return ok
}
func (fs fieldsSet) remove(field string) {
if field == "*" {
fs.reset()
return
}
if !fs.contains("*") {
if field == "" {
field = "_msg"
}
delete(fs, field)
}
}
func (fs fieldsSet) addAll(fields []string) {
for _, f := range fields {
fs.add(f)
}
}
func (fs fieldsSet) add(field string) {
if fs.contains("*") {
return
@ -76,5 +82,8 @@ func (fs fieldsSet) add(field string) {
fs["*"] = struct{}{}
return
}
if field == "" {
field = "_msg"
}
fs[field] = struct{}{}
}

View file

@ -17,9 +17,10 @@ func TestFieldsSet(t *testing.T) {
}
fs.add("foo")
fs.add("bar")
fs.add("")
s := fs.String()
if s != "[bar,foo]" {
t.Fatalf("unexpected String() result; got %s; want %s", s, "[bar,foo]")
if s != "[_msg,bar,foo]" {
t.Fatalf("unexpected String() result; got %s; want %s", s, "[_msg,bar,foo]")
}
if !fs.contains("foo") {
t.Fatalf("fs must contain foo")
@ -27,6 +28,12 @@ func TestFieldsSet(t *testing.T) {
if !fs.contains("bar") {
t.Fatalf("fs must contain bar")
}
if !fs.contains("") {
t.Fatalf("fs must contain _msg")
}
if !fs.contains("_msg") {
t.Fatalf("fs must contain _msg")
}
if fs.contains("baz") {
t.Fatalf("fs musn't contain baz")
}
@ -41,6 +48,13 @@ func TestFieldsSet(t *testing.T) {
if fs.contains("bar") {
t.Fatalf("fs mustn't contain bar")
}
fs.remove("")
if fs.contains("") {
t.Fatalf("fs mustn't contain _msg")
}
if fs.contains("_msg") {
t.Fatalf("fs mustn't contain _msg")
}
// verify *
fs.add("*")
@ -60,17 +74,17 @@ func TestFieldsSet(t *testing.T) {
}
// verify addAll, getAll, removeAll
fs.addAll([]string{"foo", "bar"})
if !fs.contains("foo") || !fs.contains("bar") {
t.Fatalf("fs must contain foo and bar")
fs.addAll([]string{"foo", "bar", "_msg"})
if !fs.contains("foo") || !fs.contains("bar") || !fs.contains("_msg") {
t.Fatalf("fs must contain foo, bar and _msg")
}
a := fs.getAll()
if !reflect.DeepEqual(a, []string{"bar", "foo"}) {
t.Fatalf("unexpected result from getAll(); got %q; want %q", a, []string{"bar", "foo"})
if !reflect.DeepEqual(a, []string{"_msg", "bar", "foo"}) {
t.Fatalf("unexpected result from getAll(); got %q; want %q", a, []string{"_msg", "bar", "foo"})
}
fs.removeAll([]string{"bar", "baz"})
if fs.contains("bar") || fs.contains("baz") {
t.Fatalf("fs mustn't contain bar and baz")
fs.removeAll([]string{"bar", "baz", "_msg"})
if fs.contains("bar") || fs.contains("baz") || fs.contains("_msg") {
t.Fatalf("fs mustn't contain bar, baz and _msg")
}
if !fs.contains("foo") {
t.Fatalf("fs must contain foo")

View file

@ -5,6 +5,12 @@ type filter interface {
// String returns string representation of the filter
String() string
// udpdateNeededFields must update neededFields with fields needed for the filter
updateNeededFields(neededFields fieldsSet)
// apply must update bm according to the filter applied to the given bs block
apply(bs *blockSearch, bm *bitmap)
// applyToBlockResult must update bm according to the filter applied to the given br block
applyToBlockResult(br *blockResult, bm *bitmap)
}

View file

@ -31,6 +31,23 @@ func (fa *filterAnd) String() string {
return strings.Join(a, " ")
}
func (fa *filterAnd) updateNeededFields(neededFields fieldsSet) {
for _, f := range fa.filters {
f.updateNeededFields(neededFields)
}
}
func (fa *filterAnd) applyToBlockResult(br *blockResult, bm *bitmap) {
for _, f := range fa.filters {
f.applyToBlockResult(br, bm)
if bm.isZero() {
// Shortcut - there is no need in applying the remaining filters,
// since the result will be zero anyway.
return
}
}
}
func (fa *filterAnd) apply(bs *blockSearch, bm *bitmap) {
if !fa.matchMessageBloomFilter(bs) {
// Fast path - fa doesn't match _msg bloom filter.

View file

@ -29,6 +29,10 @@ func (fp *filterAnyCasePhrase) String() string {
return fmt.Sprintf("%si(%s)", quoteFieldNameIfNeeded(fp.fieldName), quoteTokenIfNeeded(fp.phrase))
}
func (fp *filterAnyCasePhrase) updateNeededFields(neededFields fieldsSet) {
neededFields.add(fp.fieldName)
}
func (fp *filterAnyCasePhrase) getTokens() []string {
fp.tokensOnce.Do(fp.initTokens)
return fp.tokens
@ -47,6 +51,11 @@ func (fp *filterAnyCasePhrase) initPhraseLowercase() {
fp.phraseLowercase = strings.ToLower(fp.phrase)
}
func (fp *filterAnyCasePhrase) applyToBlockResult(br *blockResult, bm *bitmap) {
phraseLowercase := fp.getPhraseLowercase()
applyToBlockResultGeneric(br, bm, fp.fieldName, phraseLowercase, matchAnyCasePhrase)
}
func (fp *filterAnyCasePhrase) apply(bs *blockSearch, bm *bitmap) {
fieldName := fp.fieldName
phraseLowercase := fp.getPhraseLowercase()
@ -100,10 +109,12 @@ func (fp *filterAnyCasePhrase) apply(bs *blockSearch, bm *bitmap) {
func matchValuesDictByAnyCasePhrase(bs *blockSearch, ch *columnHeader, bm *bitmap, phraseLowercase string) {
bb := bbPool.Get()
for i, v := range ch.valuesDict.values {
for _, v := range ch.valuesDict.values {
c := byte(0)
if matchAnyCasePhrase(v, phraseLowercase) {
bb.B = append(bb.B, byte(i))
c = 1
}
bb.B = append(bb.B, c)
}
matchEncodedValuesDict(bs, ch, bm, bb.B)
bbPool.Put(bb)

View file

@ -33,6 +33,10 @@ func (fp *filterAnyCasePrefix) String() string {
return fmt.Sprintf("%si(%s*)", quoteFieldNameIfNeeded(fp.fieldName), quoteTokenIfNeeded(fp.prefix))
}
func (fp *filterAnyCasePrefix) updateNeededFields(neededFields fieldsSet) {
neededFields.add(fp.fieldName)
}
func (fp *filterAnyCasePrefix) getTokens() []string {
fp.tokensOnce.Do(fp.initTokens)
return fp.tokens
@ -51,6 +55,11 @@ func (fp *filterAnyCasePrefix) initPrefixLowercase() {
fp.prefixLowercase = strings.ToLower(fp.prefix)
}
func (fp *filterAnyCasePrefix) applyToBlockResult(br *blockResult, bm *bitmap) {
prefixLowercase := fp.getPrefixLowercase()
applyToBlockResultGeneric(br, bm, fp.fieldName, prefixLowercase, matchAnyCasePrefix)
}
func (fp *filterAnyCasePrefix) apply(bs *blockSearch, bm *bitmap) {
fieldName := fp.fieldName
prefixLowercase := fp.getPrefixLowercase()
@ -101,10 +110,12 @@ func (fp *filterAnyCasePrefix) apply(bs *blockSearch, bm *bitmap) {
func matchValuesDictByAnyCasePrefix(bs *blockSearch, ch *columnHeader, bm *bitmap, prefixLowercase string) {
bb := bbPool.Get()
for i, v := range ch.valuesDict.values {
for _, v := range ch.valuesDict.values {
c := byte(0)
if matchAnyCasePrefix(v, prefixLowercase) {
bb.B = append(bb.B, byte(i))
c = 1
}
bb.B = append(bb.B, c)
}
matchEncodedValuesDict(bs, ch, bm, bb.B)
bbPool.Put(bb)

View file

@ -24,6 +24,10 @@ func (fe *filterExact) String() string {
return fmt.Sprintf("%sexact(%s)", quoteFieldNameIfNeeded(fe.fieldName), quoteTokenIfNeeded(fe.value))
}
func (fe *filterExact) updateNeededFields(neededFields fieldsSet) {
neededFields.add(fe.fieldName)
}
func (fe *filterExact) getTokens() []string {
fe.tokensOnce.Do(fe.initTokens)
return fe.tokens
@ -33,6 +37,132 @@ func (fe *filterExact) initTokens() {
fe.tokens = tokenizeStrings(nil, []string{fe.value})
}
func (fe *filterExact) applyToBlockResult(br *blockResult, bm *bitmap) {
value := fe.value
c := br.getColumnByName(fe.fieldName)
if c.isConst {
v := c.valuesEncoded[0]
if v != value {
bm.resetBits()
}
return
}
if c.isTime {
matchColumnByExactValue(br, bm, c, value)
return
}
switch c.valueType {
case valueTypeString:
matchColumnByExactValue(br, bm, c, value)
case valueTypeDict:
bb := bbPool.Get()
for _, v := range c.dictValues {
c := byte(0)
if v == value {
c = 1
}
bb.B = append(bb.B, c)
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
n := valuesEncoded[idx][0]
return bb.B[n] == 1
})
bbPool.Put(bb)
case valueTypeUint8:
n, ok := tryParseUint64(value)
if !ok || n >= (1<<8) {
bm.resetBits()
return
}
nNeeded := uint8(n)
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
n := unmarshalUint8(valuesEncoded[idx])
return n == nNeeded
})
case valueTypeUint16:
n, ok := tryParseUint64(value)
if !ok || n >= (1<<16) {
bm.resetBits()
return
}
nNeeded := uint16(n)
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
n := unmarshalUint16(valuesEncoded[idx])
return n == nNeeded
})
case valueTypeUint32:
n, ok := tryParseUint64(value)
if !ok || n >= (1<<32) {
bm.resetBits()
return
}
nNeeded := uint32(n)
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
n := unmarshalUint32(valuesEncoded[idx])
return n == nNeeded
})
case valueTypeUint64:
nNeeded, ok := tryParseUint64(value)
if !ok {
bm.resetBits()
return
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
n := unmarshalUint64(valuesEncoded[idx])
return n == nNeeded
})
case valueTypeFloat64:
fNeeded, ok := tryParseFloat64(value)
if !ok {
bm.resetBits()
return
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
f := unmarshalFloat64(valuesEncoded[idx])
return f == fNeeded
})
case valueTypeIPv4:
ipNeeded, ok := tryParseIPv4(value)
if !ok {
bm.resetBits()
return
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
ip := unmarshalIPv4(valuesEncoded[idx])
return ip == ipNeeded
})
case valueTypeTimestampISO8601:
timestampNeeded, ok := tryParseTimestampISO8601(value)
if !ok {
bm.resetBits()
return
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
timestamp := unmarshalTimestampISO8601(valuesEncoded[idx])
return timestamp == timestampNeeded
})
default:
logger.Panicf("FATAL: unknown valueType=%d", c.valueType)
}
}
func matchColumnByExactValue(br *blockResult, bm *bitmap, c *blockResultColumn, value string) {
values := c.getValues(br)
bm.forEachSetBit(func(idx int) bool {
return values[idx] == value
})
}
func (fe *filterExact) apply(bs *blockSearch, bm *bitmap) {
fieldName := fe.fieldName
value := fe.value
@ -121,10 +251,12 @@ func matchFloat64ByExactValue(bs *blockSearch, ch *columnHeader, bm *bitmap, val
func matchValuesDictByExactValue(bs *blockSearch, ch *columnHeader, bm *bitmap, value string) {
bb := bbPool.Get()
for i, v := range ch.valuesDict.values {
for _, v := range ch.valuesDict.values {
c := byte(0)
if v == value {
bb.B = append(bb.B, byte(i))
c = 1
}
bb.B = append(bb.B, c)
}
matchEncodedValuesDict(bs, ch, bm, bb.B)
bbPool.Put(bb)

View file

@ -23,6 +23,10 @@ func (fep *filterExactPrefix) String() string {
return fmt.Sprintf("%sexact(%s*)", quoteFieldNameIfNeeded(fep.fieldName), quoteTokenIfNeeded(fep.prefix))
}
func (fep *filterExactPrefix) updateNeededFields(neededFields fieldsSet) {
neededFields.add(fep.fieldName)
}
func (fep *filterExactPrefix) getTokens() []string {
fep.tokensOnce.Do(fep.initTokens)
return fep.tokens
@ -32,6 +36,10 @@ func (fep *filterExactPrefix) initTokens() {
fep.tokens = getTokensSkipLast(fep.prefix)
}
func (fep *filterExactPrefix) applyToBlockResult(br *blockResult, bm *bitmap) {
applyToBlockResultGeneric(br, bm, fep.fieldName, fep.prefix, matchExactPrefix)
}
func (fep *filterExactPrefix) apply(bs *blockSearch, bm *bitmap) {
fieldName := fep.fieldName
prefix := fep.prefix
@ -134,10 +142,12 @@ func matchFloat64ByExactPrefix(bs *blockSearch, ch *columnHeader, bm *bitmap, pr
func matchValuesDictByExactPrefix(bs *blockSearch, ch *columnHeader, bm *bitmap, prefix string) {
bb := bbPool.Get()
for i, v := range ch.valuesDict.values {
for _, v := range ch.valuesDict.values {
c := byte(0)
if matchExactPrefix(v, prefix) {
bb.B = append(bb.B, byte(i))
c = 1
}
bb.B = append(bb.B, c)
}
matchEncodedValuesDict(bs, ch, bm, bb.B)
bbPool.Put(bb)

View file

@ -55,6 +55,10 @@ func (fi *filterIn) String() string {
return fmt.Sprintf("%sin(%s)", quoteFieldNameIfNeeded(fi.fieldName), strings.Join(a, ","))
}
func (fi *filterIn) updateNeededFields(neededFields fieldsSet) {
neededFields.add(fi.fieldName)
}
func (fi *filterIn) getTokenSets() [][]string {
fi.tokenSetsOnce.Do(fi.initTokenSets)
return fi.tokenSets
@ -249,6 +253,94 @@ func (fi *filterIn) initTimestampISO8601Values() {
fi.timestampISO8601Values = m
}
func (fi *filterIn) applyToBlockResult(br *blockResult, bm *bitmap) {
if len(fi.values) == 0 {
bm.resetBits()
return
}
c := br.getColumnByName(fi.fieldName)
if c.isConst {
stringValues := fi.getStringValues()
v := c.valuesEncoded[0]
if _, ok := stringValues[v]; !ok {
bm.resetBits()
}
return
}
if c.isTime {
fi.matchColumnByStringValues(br, bm, c)
return
}
switch c.valueType {
case valueTypeString:
fi.matchColumnByStringValues(br, bm, c)
case valueTypeDict:
stringValues := fi.getStringValues()
bb := bbPool.Get()
for _, v := range c.dictValues {
c := byte(0)
if _, ok := stringValues[v]; ok {
c = 1
}
bb.B = append(bb.B, c)
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
n := valuesEncoded[idx][0]
return bb.B[n] == 1
})
bbPool.Put(bb)
case valueTypeUint8:
binValues := fi.getUint8Values()
matchColumnByBinValues(br, bm, c, binValues)
case valueTypeUint16:
binValues := fi.getUint16Values()
matchColumnByBinValues(br, bm, c, binValues)
case valueTypeUint32:
binValues := fi.getUint32Values()
matchColumnByBinValues(br, bm, c, binValues)
case valueTypeUint64:
binValues := fi.getUint64Values()
matchColumnByBinValues(br, bm, c, binValues)
case valueTypeFloat64:
binValues := fi.getFloat64Values()
matchColumnByBinValues(br, bm, c, binValues)
case valueTypeIPv4:
binValues := fi.getIPv4Values()
matchColumnByBinValues(br, bm, c, binValues)
case valueTypeTimestampISO8601:
binValues := fi.getTimestampISO8601Values()
matchColumnByBinValues(br, bm, c, binValues)
default:
logger.Panicf("FATAL: unknown valueType=%d", c.valueType)
}
}
func (fi *filterIn) matchColumnByStringValues(br *blockResult, bm *bitmap, c *blockResultColumn) {
stringValues := fi.getStringValues()
values := c.getValues(br)
bm.forEachSetBit(func(idx int) bool {
v := values[idx]
_, ok := stringValues[v]
return ok
})
}
func matchColumnByBinValues(br *blockResult, bm *bitmap, c *blockResultColumn, binValues map[string]struct{}) {
if len(binValues) == 0 {
bm.resetBits()
return
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
v := valuesEncoded[idx]
_, ok := binValues[v]
return ok
})
}
func (fi *filterIn) apply(bs *blockSearch, bm *bitmap) {
fieldName := fi.fieldName
@ -314,6 +406,10 @@ func (fi *filterIn) apply(bs *blockSearch, bm *bitmap) {
}
func matchAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, values map[string]struct{}, tokenSets [][]string) {
if len(values) == 0 {
bm.resetBits()
return
}
if !matchBloomFilterAnyTokenSet(bs, ch, tokenSets) {
bm.resetBits()
return
@ -344,10 +440,12 @@ func matchBloomFilterAnyTokenSet(bs *blockSearch, ch *columnHeader, tokenSets []
func matchValuesDictByAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, values map[string]struct{}) {
bb := bbPool.Get()
for i, v := range ch.valuesDict.values {
for _, v := range ch.valuesDict.values {
c := byte(0)
if _, ok := values[v]; ok {
bb.B = append(bb.B, byte(i))
c = 1
}
bb.B = append(bb.B, c)
}
matchEncodedValuesDict(bs, ch, bm, bb.B)
bbPool.Put(bb)

View file

@ -21,6 +21,77 @@ func (fr *filterIPv4Range) String() string {
return fmt.Sprintf("%sipv4_range(%s, %s)", quoteFieldNameIfNeeded(fr.fieldName), minValue, maxValue)
}
func (fr *filterIPv4Range) updateNeededFields(neededFields fieldsSet) {
neededFields.add(fr.fieldName)
}
func (fr *filterIPv4Range) applyToBlockResult(br *blockResult, bm *bitmap) {
minValue := fr.minValue
maxValue := fr.maxValue
if minValue > maxValue {
bm.resetBits()
return
}
c := br.getColumnByName(fr.fieldName)
if c.isConst {
v := c.valuesEncoded[0]
if !matchIPv4Range(v, minValue, maxValue) {
bm.resetBits()
}
return
}
if c.isTime {
bm.resetBits()
return
}
switch c.valueType {
case valueTypeString:
values := c.getValues(br)
bm.forEachSetBit(func(idx int) bool {
v := values[idx]
return matchIPv4Range(v, minValue, maxValue)
})
case valueTypeDict:
bb := bbPool.Get()
for _, v := range c.dictValues {
c := byte(0)
if matchIPv4Range(v, minValue, maxValue) {
c = 1
}
bb.B = append(bb.B, c)
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
n := valuesEncoded[idx][0]
return bb.B[n] == 1
})
bbPool.Put(bb)
case valueTypeUint8:
bm.resetBits()
case valueTypeUint16:
bm.resetBits()
case valueTypeUint32:
bm.resetBits()
case valueTypeUint64:
bm.resetBits()
case valueTypeFloat64:
bm.resetBits()
case valueTypeIPv4:
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
ip := unmarshalIPv4(valuesEncoded[idx])
return ip >= minValue && ip <= maxValue
})
case valueTypeTimestampISO8601:
bm.resetBits()
default:
logger.Panicf("FATAL: unknown valueType=%d", c.valueType)
}
}
func (fr *filterIPv4Range) apply(bs *blockSearch, bm *bitmap) {
fieldName := fr.fieldName
minValue := fr.minValue
@ -73,10 +144,12 @@ func (fr *filterIPv4Range) apply(bs *blockSearch, bm *bitmap) {
func matchValuesDictByIPv4Range(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue, maxValue uint32) {
bb := bbPool.Get()
for i, v := range ch.valuesDict.values {
for _, v := range ch.valuesDict.values {
c := byte(0)
if matchIPv4Range(v, minValue, maxValue) {
bb.B = append(bb.B, byte(i))
c = 1
}
bb.B = append(bb.B, c)
}
matchEncodedValuesDict(bs, ch, bm, bb.B)
bbPool.Put(bb)

View file

@ -21,6 +21,100 @@ func (fr *filterLenRange) String() string {
return quoteFieldNameIfNeeded(fr.fieldName) + "len_range" + fr.stringRepr
}
func (fr *filterLenRange) updateNeededFields(neededFields fieldsSet) {
neededFields.add(fr.fieldName)
}
func (fr *filterLenRange) applyToBlockResult(br *blockResult, bm *bitmap) {
minLen := fr.minLen
maxLen := fr.maxLen
if minLen > maxLen {
bm.resetBits()
return
}
c := br.getColumnByName(fr.fieldName)
if c.isConst {
v := c.valuesEncoded[0]
if !matchLenRange(v, minLen, maxLen) {
bm.resetBits()
}
return
}
if c.isTime {
matchColumnByLenRange(br, bm, c, minLen, maxLen)
}
switch c.valueType {
case valueTypeString:
matchColumnByLenRange(br, bm, c, minLen, maxLen)
case valueTypeDict:
bb := bbPool.Get()
for _, v := range c.dictValues {
c := byte(0)
if matchLenRange(v, minLen, maxLen) {
c = 1
}
bb.B = append(bb.B, c)
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
n := valuesEncoded[idx][0]
return bb.B[n] == 1
})
bbPool.Put(bb)
case valueTypeUint8:
if minLen > 3 || maxLen == 0 {
bm.resetBits()
return
}
matchColumnByLenRange(br, bm, c, minLen, maxLen)
case valueTypeUint16:
if minLen > 5 || maxLen == 0 {
bm.resetBits()
return
}
matchColumnByLenRange(br, bm, c, minLen, maxLen)
case valueTypeUint32:
if minLen > 10 || maxLen == 0 {
bm.resetBits()
return
}
matchColumnByLenRange(br, bm, c, minLen, maxLen)
case valueTypeUint64:
if minLen > 20 || maxLen == 0 {
bm.resetBits()
return
}
matchColumnByLenRange(br, bm, c, minLen, maxLen)
case valueTypeFloat64:
if minLen > 24 || maxLen == 0 {
bm.resetBits()
return
}
matchColumnByLenRange(br, bm, c, minLen, maxLen)
case valueTypeIPv4:
if minLen > uint64(len("255.255.255.255")) || maxLen < uint64(len("0.0.0.0")) {
bm.resetBits()
return
}
matchColumnByLenRange(br, bm, c, minLen, maxLen)
case valueTypeTimestampISO8601:
matchTimestampISO8601ByLenRange(bm, minLen, maxLen)
default:
logger.Panicf("FATAL: unknown valueType=%d", c.valueType)
}
}
func matchColumnByLenRange(br *blockResult, bm *bitmap, c *blockResultColumn, minLen, maxLen uint64) {
values := c.getValues(br)
bm.forEachSetBit(func(idx int) bool {
v := values[idx]
return matchLenRange(v, minLen, maxLen)
})
}
func (fr *filterLenRange) apply(bs *blockSearch, bm *bitmap) {
fieldName := fr.fieldName
minLen := fr.minLen
@ -110,10 +204,12 @@ func matchFloat64ByLenRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minLe
func matchValuesDictByLenRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minLen, maxLen uint64) {
bb := bbPool.Get()
for i, v := range ch.valuesDict.values {
for _, v := range ch.valuesDict.values {
c := byte(0)
if matchLenRange(v, minLen, maxLen) {
bb.B = append(bb.B, byte(i))
c = 1
}
bb.B = append(bb.B, c)
}
matchEncodedValuesDict(bs, ch, bm, bb.B)
bbPool.Put(bb)
@ -126,6 +222,10 @@ func matchStringByLenRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minLen
}
func matchUint8ByLenRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minLen, maxLen uint64) {
if minLen > 3 || maxLen == 0 {
bm.resetBits()
return
}
if !matchMinMaxValueLen(ch, minLen, maxLen) {
bm.resetBits()
return
@ -140,6 +240,10 @@ func matchUint8ByLenRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minLen,
}
func matchUint16ByLenRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minLen, maxLen uint64) {
if minLen > 5 || maxLen == 0 {
bm.resetBits()
return
}
if !matchMinMaxValueLen(ch, minLen, maxLen) {
bm.resetBits()
return
@ -154,6 +258,10 @@ func matchUint16ByLenRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minLen
}
func matchUint32ByLenRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minLen, maxLen uint64) {
if minLen > 10 || maxLen == 0 {
bm.resetBits()
return
}
if !matchMinMaxValueLen(ch, minLen, maxLen) {
bm.resetBits()
return
@ -168,6 +276,10 @@ func matchUint32ByLenRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minLen
}
func matchUint64ByLenRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minLen, maxLen uint64) {
if minLen > 20 || maxLen == 0 {
bm.resetBits()
return
}
if !matchMinMaxValueLen(ch, minLen, maxLen) {
bm.resetBits()
return

View file

@ -8,6 +8,14 @@ func (fn *filterNoop) String() string {
return ""
}
func (fn *filterNoop) updateNeededFields(neededFields fieldsSet) {
// nothing to do
}
func (fn *filterNoop) applyToBlockResult(_ *blockResult, _ *bitmap) {
// nothing to do
}
func (fn *filterNoop) apply(_ *blockSearch, _ *bitmap) {
// nothing to do
}

View file

@ -16,6 +16,20 @@ func (fn *filterNot) String() string {
return "!" + s
}
func (fn *filterNot) updateNeededFields(neededFields fieldsSet) {
fn.f.updateNeededFields(neededFields)
}
func (fn *filterNot) applyToBlockResult(br *blockResult, bm *bitmap) {
// Minimize the number of rows to check by the filter by applying it
// only to the rows, which match the bm, e.g. they may change the bm result.
bmTmp := getBitmap(bm.bitsLen)
bmTmp.copyFrom(bm)
fn.f.applyToBlockResult(br, bmTmp)
bm.andNot(bmTmp)
putBitmap(bmTmp)
}
func (fn *filterNot) apply(bs *blockSearch, bm *bitmap) {
// Minimize the number of rows to check by the filter by applying it
// only to the rows, which match the bm, e.g. they may change the bm result.

View file

@ -21,6 +21,35 @@ func (fo *filterOr) String() string {
return strings.Join(a, " or ")
}
func (fo *filterOr) updateNeededFields(neededFields fieldsSet) {
for _, f := range fo.filters {
f.updateNeededFields(neededFields)
}
}
func (fo *filterOr) applyToBlockResult(br *blockResult, bm *bitmap) {
bmResult := getBitmap(bm.bitsLen)
bmTmp := getBitmap(bm.bitsLen)
for _, f := range fo.filters {
// Minimize the number of rows to check by the filter by checking only
// the rows, which may change the output bm:
// - bm matches them, e.g. the caller wants to get them
// - bmResult doesn't match them, e.g. all the previous OR filters didn't match them
bmTmp.copyFrom(bm)
bmTmp.andNot(bmResult)
if bmTmp.isZero() {
// Shortcut - there is no need in applying the remaining filters,
// since the result already matches all the values from the block.
break
}
f.applyToBlockResult(br, bmTmp)
bmResult.or(bmTmp)
}
putBitmap(bmTmp)
bm.copyFrom(bmResult)
putBitmap(bmResult)
}
func (fo *filterOr) apply(bs *blockSearch, bm *bitmap) {
bmResult := getBitmap(bm.bitsLen)
bmTmp := getBitmap(bm.bitsLen)

View file

@ -32,6 +32,10 @@ func (fp *filterPhrase) String() string {
return quoteFieldNameIfNeeded(fp.fieldName) + quoteTokenIfNeeded(fp.phrase)
}
func (fp *filterPhrase) updateNeededFields(neededColumns fieldsSet) {
neededColumns.add(fp.fieldName)
}
func (fp *filterPhrase) getTokens() []string {
fp.tokensOnce.Do(fp.initTokens)
return fp.tokens
@ -41,6 +45,10 @@ func (fp *filterPhrase) initTokens() {
fp.tokens = tokenizeStrings(nil, []string{fp.phrase})
}
func (fp *filterPhrase) applyToBlockResult(br *blockResult, bm *bitmap) {
applyToBlockResultGeneric(br, bm, fp.fieldName, fp.phrase, matchPhrase)
}
func (fp *filterPhrase) apply(bs *blockSearch, bm *bitmap) {
fieldName := fp.fieldName
phrase := fp.phrase
@ -168,10 +176,12 @@ func matchFloat64ByPhrase(bs *blockSearch, ch *columnHeader, bm *bitmap, phrase
func matchValuesDictByPhrase(bs *blockSearch, ch *columnHeader, bm *bitmap, phrase string) {
bb := bbPool.Get()
for i, v := range ch.valuesDict.values {
for _, v := range ch.valuesDict.values {
c := byte(0)
if matchPhrase(v, phrase) {
bb.B = append(bb.B, byte(i))
c = 1
}
bb.B = append(bb.B, c)
}
matchEncodedValuesDict(bs, ch, bm, bb.B)
bbPool.Put(bb)
@ -249,7 +259,7 @@ func getPhrasePos(s, phrase string) int {
}
func matchEncodedValuesDict(bs *blockSearch, ch *columnHeader, bm *bitmap, encodedValues []byte) {
if len(encodedValues) == 0 {
if bytes.IndexByte(encodedValues, 1) < 0 {
// Fast path - the phrase is missing in the valuesDict
bm.resetBits()
return
@ -259,8 +269,11 @@ func matchEncodedValuesDict(bs *blockSearch, ch *columnHeader, bm *bitmap, encod
if len(v) != 1 {
logger.Panicf("FATAL: %s: unexpected length for dict value: got %d; want 1", bs.partPath(), len(v))
}
n := bytes.IndexByte(encodedValues, v[0])
return n >= 0
idx := v[0]
if int(idx) >= len(encodedValues) {
logger.Panicf("FATAL: %s: too big index for dict value; got %d; must be smaller than %d", bs.partPath(), idx, len(encodedValues))
}
return encodedValues[idx] == 1
})
}
@ -320,3 +333,81 @@ func toTimestampISO8601String(bs *blockSearch, bb *bytesutil.ByteBuffer, v strin
bb.B = marshalTimestampISO8601String(bb.B[:0], timestamp)
return bytesutil.ToUnsafeString(bb.B)
}
func applyToBlockResultGeneric(br *blockResult, bm *bitmap, fieldName, phrase string, matchFunc func(v, phrase string) bool) {
c := br.getColumnByName(fieldName)
if c.isConst {
v := c.valuesEncoded[0]
if !matchFunc(v, phrase) {
bm.resetBits()
}
return
}
if c.isTime {
matchColumnByPhraseGeneric(br, bm, c, phrase, matchFunc)
return
}
switch c.valueType {
case valueTypeString:
matchColumnByPhraseGeneric(br, bm, c, phrase, matchFunc)
case valueTypeDict:
bb := bbPool.Get()
for _, v := range c.dictValues {
c := byte(0)
if matchFunc(v, phrase) {
c = 1
}
bb.B = append(bb.B, c)
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
n := valuesEncoded[idx][0]
return bb.B[n] == 1
})
bbPool.Put(bb)
case valueTypeUint8:
n, ok := tryParseUint64(phrase)
if !ok || n >= (1<<8) {
bm.resetBits()
return
}
matchColumnByPhraseGeneric(br, bm, c, phrase, matchFunc)
case valueTypeUint16:
n, ok := tryParseUint64(phrase)
if !ok || n >= (1<<16) {
bm.resetBits()
return
}
matchColumnByPhraseGeneric(br, bm, c, phrase, matchFunc)
case valueTypeUint32:
n, ok := tryParseUint64(phrase)
if !ok || n >= (1<<32) {
bm.resetBits()
return
}
matchColumnByPhraseGeneric(br, bm, c, phrase, matchFunc)
case valueTypeUint64:
_, ok := tryParseUint64(phrase)
if !ok {
bm.resetBits()
return
}
matchColumnByPhraseGeneric(br, bm, c, phrase, matchFunc)
case valueTypeFloat64:
matchColumnByPhraseGeneric(br, bm, c, phrase, matchFunc)
case valueTypeIPv4:
matchColumnByPhraseGeneric(br, bm, c, phrase, matchFunc)
case valueTypeTimestampISO8601:
matchColumnByPhraseGeneric(br, bm, c, phrase, matchFunc)
default:
logger.Panicf("FATAL: unknown valueType=%d", c.valueType)
}
}
func matchColumnByPhraseGeneric(br *blockResult, bm *bitmap, c *blockResultColumn, phrase string, matchFunc func(v, phrase string) bool) {
values := c.getValues(br)
bm.forEachSetBit(func(idx int) bool {
return matchFunc(values[idx], phrase)
})
}

View file

@ -30,6 +30,10 @@ func (fp *filterPrefix) String() string {
return fmt.Sprintf("%s%s*", quoteFieldNameIfNeeded(fp.fieldName), quoteTokenIfNeeded(fp.prefix))
}
func (fp *filterPrefix) updateNeededFields(neededFields fieldsSet) {
neededFields.add(fp.fieldName)
}
func (fp *filterPrefix) getTokens() []string {
fp.tokensOnce.Do(fp.initTokens)
return fp.tokens
@ -39,6 +43,10 @@ func (fp *filterPrefix) initTokens() {
fp.tokens = getTokensSkipLast(fp.prefix)
}
func (fp *filterPrefix) applyToBlockResult(bs *blockResult, bm *bitmap) {
applyToBlockResultGeneric(bs, bm, fp.fieldName, fp.prefix, matchPrefix)
}
func (fp *filterPrefix) apply(bs *blockSearch, bm *bitmap) {
fieldName := fp.fieldName
prefix := fp.prefix
@ -158,10 +166,12 @@ func matchFloat64ByPrefix(bs *blockSearch, ch *columnHeader, bm *bitmap, prefix
func matchValuesDictByPrefix(bs *blockSearch, ch *columnHeader, bm *bitmap, prefix string) {
bb := bbPool.Get()
for i, v := range ch.valuesDict.values {
for _, v := range ch.valuesDict.values {
c := byte(0)
if matchPrefix(v, prefix) {
bb.B = append(bb.B, byte(i))
c = 1
}
bb.B = append(bb.B, c)
}
matchEncodedValuesDict(bs, ch, bm, bb.B)
bbPool.Put(bb)

View file

@ -21,6 +21,120 @@ func (fr *filterRange) String() string {
return quoteFieldNameIfNeeded(fr.fieldName) + "range" + fr.stringRepr
}
func (fr *filterRange) updateNeededFields(neededFields fieldsSet) {
neededFields.add(fr.fieldName)
}
func (fr *filterRange) applyToBlockResult(br *blockResult, bm *bitmap) {
minValue := fr.minValue
maxValue := fr.maxValue
if minValue > maxValue {
bm.resetBits()
return
}
c := br.getColumnByName(fr.fieldName)
if c.isConst {
v := c.valuesEncoded[0]
if !matchRange(v, minValue, maxValue) {
bm.resetBits()
}
return
}
if c.isTime {
bm.resetBits()
return
}
switch c.valueType {
case valueTypeString:
values := c.getValues(br)
bm.forEachSetBit(func(idx int) bool {
v := values[idx]
return matchRange(v, minValue, maxValue)
})
case valueTypeDict:
bb := bbPool.Get()
for _, v := range c.dictValues {
c := byte(0)
if matchRange(v, minValue, maxValue) {
c = 1
}
bb.B = append(bb.B, c)
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
n := valuesEncoded[idx][0]
return bb.B[n] == 1
})
bbPool.Put(bb)
case valueTypeUint8:
minValueUint, maxValueUint := toUint64Range(minValue, maxValue)
if maxValue < 0 || minValueUint > c.maxValue || maxValueUint < c.minValue {
bm.resetBits()
return
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
v := valuesEncoded[idx]
n := uint64(unmarshalUint8(v))
return n >= minValueUint && n <= maxValueUint
})
case valueTypeUint16:
minValueUint, maxValueUint := toUint64Range(minValue, maxValue)
if maxValue < 0 || minValueUint > c.maxValue || maxValueUint < c.minValue {
bm.resetBits()
return
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
v := valuesEncoded[idx]
n := uint64(unmarshalUint16(v))
return n >= minValueUint && n <= maxValueUint
})
case valueTypeUint32:
minValueUint, maxValueUint := toUint64Range(minValue, maxValue)
if maxValue < 0 || minValueUint > c.maxValue || maxValueUint < c.minValue {
bm.resetBits()
return
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
v := valuesEncoded[idx]
n := uint64(unmarshalUint32(v))
return n >= minValueUint && n <= maxValueUint
})
case valueTypeUint64:
minValueUint, maxValueUint := toUint64Range(minValue, maxValue)
if maxValue < 0 || minValueUint > c.maxValue || maxValueUint < c.minValue {
bm.resetBits()
return
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
v := valuesEncoded[idx]
n := unmarshalUint64(v)
return n >= minValueUint && n <= maxValueUint
})
case valueTypeFloat64:
if minValue > math.Float64frombits(c.maxValue) || maxValue < math.Float64frombits(c.minValue) {
bm.resetBits()
return
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
v := valuesEncoded[idx]
f := unmarshalFloat64(v)
return f >= minValue && f <= maxValue
})
case valueTypeTimestampISO8601:
bm.resetBits()
default:
logger.Panicf("FATAL: unknown valueType=%d", c.valueType)
}
}
func (fr *filterRange) apply(bs *blockSearch, bm *bitmap) {
fieldName := fr.fieldName
minValue := fr.minValue
@ -88,10 +202,12 @@ func matchFloat64ByRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue
func matchValuesDictByRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue, maxValue float64) {
bb := bbPool.Get()
for i, v := range ch.valuesDict.values {
for _, v := range ch.valuesDict.values {
c := byte(0)
if matchRange(v, minValue, maxValue) {
bb.B = append(bb.B, byte(i))
c = 1
}
bb.B = append(bb.B, c)
}
matchEncodedValuesDict(bs, ch, bm, bb.B)
bbPool.Put(bb)

View file

@ -19,6 +19,17 @@ func (fr *filterRegexp) String() string {
return fmt.Sprintf("%sre(%q)", quoteFieldNameIfNeeded(fr.fieldName), fr.re.String())
}
func (fr *filterRegexp) updateNeededFields(neededFields fieldsSet) {
neededFields.add(fr.fieldName)
}
func (fr *filterRegexp) applyToBlockResult(br *blockResult, bm *bitmap) {
re := fr.re
applyToBlockResultGeneric(br, bm, fr.fieldName, "", func(v, _ string) bool {
return re.MatchString(v)
})
}
func (fr *filterRegexp) apply(bs *blockSearch, bm *bitmap) {
fieldName := fr.fieldName
re := fr.re
@ -95,10 +106,12 @@ func matchFloat64ByRegexp(bs *blockSearch, ch *columnHeader, bm *bitmap, re *reg
func matchValuesDictByRegexp(bs *blockSearch, ch *columnHeader, bm *bitmap, re *regexp.Regexp) {
bb := bbPool.Get()
for i, v := range ch.valuesDict.values {
for _, v := range ch.valuesDict.values {
c := byte(0)
if re.MatchString(v) {
bb.B = append(bb.B, byte(i))
c = 1
}
bb.B = append(bb.B, c)
}
matchEncodedValuesDict(bs, ch, bm, bb.B)
bbPool.Put(bb)

View file

@ -31,6 +31,10 @@ func (fs *filterSequence) String() string {
return fmt.Sprintf("%sseq(%s)", quoteFieldNameIfNeeded(fs.fieldName), strings.Join(a, ","))
}
func (fs *filterSequence) updateNeededFields(neededFields fieldsSet) {
neededFields.add(fs.fieldName)
}
func (fs *filterSequence) getTokens() []string {
fs.tokensOnce.Do(fs.initTokens)
return fs.tokens
@ -58,6 +62,17 @@ func (fs *filterSequence) initNonEmptyPhrases() {
fs.nonEmptyPhrases = result
}
func (fs *filterSequence) applyToBlockResult(br *blockResult, bm *bitmap) {
phrases := fs.getNonEmptyPhrases()
if len(phrases) == 0 {
return
}
applyToBlockResultGeneric(br, bm, fs.fieldName, "", func(v, _ string) bool {
return matchSequence(v, phrases)
})
}
func (fs *filterSequence) apply(bs *blockSearch, bm *bitmap) {
fieldName := fs.fieldName
phrases := fs.getNonEmptyPhrases()
@ -171,10 +186,12 @@ func matchFloat64BySequence(bs *blockSearch, ch *columnHeader, bm *bitmap, phras
func matchValuesDictBySequence(bs *blockSearch, ch *columnHeader, bm *bitmap, phrases []string) {
bb := bbPool.Get()
for i, v := range ch.valuesDict.values {
for _, v := range ch.valuesDict.values {
c := byte(0)
if matchSequence(v, phrases) {
bb.B = append(bb.B, byte(i))
c = 1
}
bb.B = append(bb.B, c)
}
matchEncodedValuesDict(bs, ch, bm, bb.B)
bbPool.Put(bb)

View file

@ -2,6 +2,8 @@ package logstorage
import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// filterStream is the filter for `_stream:{...}`
@ -27,6 +29,10 @@ func (fs *filterStream) String() string {
return "_stream:" + s
}
func (fs *filterStream) updateNeededFields(neededFields fieldsSet) {
neededFields.add("_stream")
}
func (fs *filterStream) getStreamIDs() map[streamID]struct{} {
fs.streamIDsOnce.Do(fs.initStreamIDs)
return fs.streamIDs
@ -41,6 +47,65 @@ func (fs *filterStream) initStreamIDs() {
fs.streamIDs = m
}
func (fs *filterStream) applyToBlockResult(br *blockResult, bm *bitmap) {
if fs.f.isEmpty() {
return
}
c := br.getColumnByName("_stream")
if c.isConst {
v := c.valuesEncoded[0]
if !fs.f.matchStreamName(v) {
bm.resetBits()
return
}
}
if c.isTime {
bm.resetBits()
return
}
switch c.valueType {
case valueTypeString:
values := c.getValues(br)
bm.forEachSetBit(func(idx int) bool {
v := values[idx]
return fs.f.matchStreamName(v)
})
case valueTypeDict:
bb := bbPool.Get()
for _, v := range c.dictValues {
c := byte(0)
if fs.f.matchStreamName(v) {
c = 1
}
bb.B = append(bb.B, c)
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
n := valuesEncoded[idx][0]
return bb.B[n] == 1
})
bbPool.Put(bb)
case valueTypeUint8:
bm.resetBits()
case valueTypeUint16:
bm.resetBits()
case valueTypeUint32:
bm.resetBits()
case valueTypeUint64:
bm.resetBits()
case valueTypeFloat64:
bm.resetBits()
case valueTypeIPv4:
bm.resetBits()
case valueTypeTimestampISO8601:
bm.resetBits()
default:
logger.Panicf("FATAL: unknown valueType=%d", c.valueType)
}
}
func (fs *filterStream) apply(bs *blockSearch, bm *bitmap) {
if fs.f.isEmpty() {
return

View file

@ -22,6 +22,24 @@ func (fr *filterStringRange) String() string {
return fmt.Sprintf("%sstring_range(%s, %s)", quoteFieldNameIfNeeded(fr.fieldName), quoteTokenIfNeeded(fr.minValue), quoteTokenIfNeeded(fr.maxValue))
}
func (fr *filterStringRange) updateNeededFields(neededFields fieldsSet) {
neededFields.add(fr.fieldName)
}
func (fr *filterStringRange) applyToBlockResult(br *blockResult, bm *bitmap) {
minValue := fr.minValue
maxValue := fr.maxValue
if minValue > maxValue {
bm.resetBits()
return
}
applyToBlockResultGeneric(br, bm, fr.fieldName, "", func(v, _ string) bool {
return matchStringRange(v, minValue, maxValue)
})
}
func (fr *filterStringRange) apply(bs *blockSearch, bm *bitmap) {
fieldName := fr.fieldName
minValue := fr.minValue
@ -117,10 +135,12 @@ func matchFloat64ByStringRange(bs *blockSearch, ch *columnHeader, bm *bitmap, mi
func matchValuesDictByStringRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue, maxValue string) {
bb := bbPool.Get()
for i, v := range ch.valuesDict.values {
for _, v := range ch.valuesDict.values {
c := byte(0)
if matchStringRange(v, minValue, maxValue) {
bb.B = append(bb.B, byte(i))
c = 1
}
bb.B = append(bb.B, c)
}
matchEncodedValuesDict(bs, ch, bm, bb.B)
bbPool.Put(bb)

View file

@ -197,11 +197,6 @@ func testFilterMatchForStorage(t *testing.T, s *Storage, tenantID TenantID, f fi
}
workersCount := 3
s.search(workersCount, so, nil, func(_ uint, br *blockResult) {
// Verify tenantID
if !br.streamID.tenantID.equal(&tenantID) {
t.Fatalf("unexpected tenantID in blockResult; got %s; want %s", &br.streamID.tenantID, &tenantID)
}
// Verify columns
cs := br.getColumns()
if len(cs) != 1 {

View file

@ -1,5 +1,9 @@
package logstorage
import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// filterTime filters by time.
//
// It is expressed as `_time:(start, end]` in LogsQL.
@ -18,6 +22,94 @@ func (ft *filterTime) String() string {
return "_time:" + ft.stringRepr
}
func (ft *filterTime) updateNeededFields(neededFields fieldsSet) {
neededFields.add("_time")
}
func (ft *filterTime) applyToBlockResult(br *blockResult, bm *bitmap) {
minTimestamp := ft.minTimestamp
maxTimestamp := ft.maxTimestamp
if minTimestamp > maxTimestamp {
bm.resetBits()
return
}
c := br.getColumnByName("_time")
if c.isConst {
v := c.valuesEncoded[0]
if !ft.matchTimestampString(v) {
bm.resetBits()
}
return
}
if c.isTime {
timestamps := br.timestamps
bm.forEachSetBit(func(idx int) bool {
timestamp := timestamps[idx]
return ft.matchTimestampValue(timestamp)
})
return
}
switch c.valueType {
case valueTypeString:
values := c.getValues(br)
bm.forEachSetBit(func(idx int) bool {
v := values[idx]
return ft.matchTimestampString(v)
})
case valueTypeDict:
bb := bbPool.Get()
for _, v := range c.dictValues {
c := byte(0)
if ft.matchTimestampString(v) {
c = 1
}
bb.B = append(bb.B, c)
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
n := valuesEncoded[idx][0]
return bb.B[n] == 1
})
bbPool.Put(bb)
case valueTypeUint8:
bm.resetBits()
case valueTypeUint16:
bm.resetBits()
case valueTypeUint32:
bm.resetBits()
case valueTypeUint64:
bm.resetBits()
case valueTypeFloat64:
bm.resetBits()
case valueTypeIPv4:
bm.resetBits()
case valueTypeTimestampISO8601:
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
v := valuesEncoded[idx]
timestamp := unmarshalTimestampISO8601(v)
return ft.matchTimestampValue(timestamp)
})
default:
logger.Panicf("FATAL: unknown valueType=%d", c.valueType)
}
}
func (ft *filterTime) matchTimestampString(v string) bool {
timestamp, ok := tryParseTimestampRFC3339Nano(v)
if !ok {
return false
}
return ft.matchTimestampValue(timestamp)
}
func (ft *filterTime) matchTimestampValue(timestamp int64) bool {
return timestamp >= ft.minTimestamp && timestamp <= ft.maxTimestamp
}
func (ft *filterTime) apply(bs *blockSearch, bm *bitmap) {
minTimestamp := ft.minTimestamp
maxTimestamp := ft.maxTimestamp

View file

@ -268,7 +268,7 @@ func (is *indexSearch) getStreamIDsForTagFilter(tenantID TenantID, tf *streamTag
}
return ids
case "=~":
re := tf.getRegexp()
re := tf.regexp
if re.MatchString("") {
// (field=~"|re") => (field="" or field=~"re")
ids := is.getStreamIDsForEmptyTagValue(tenantID, tf.tagName)
@ -280,7 +280,7 @@ func (is *indexSearch) getStreamIDsForTagFilter(tenantID TenantID, tf *streamTag
}
return is.getStreamIDsForTagRegexp(tenantID, tf.tagName, re)
case "!~":
re := tf.getRegexp()
re := tf.regexp
if re.MatchString("") {
// (field!~"|re") => (field!="" and not field=~"re")
ids := is.getStreamIDsForTagName(tenantID, tf.tagName)

View file

@ -968,6 +968,13 @@ func TestParseQuerySuccess(t *testing.T) {
f(`* | stats by (_time:year offset 6.5h) count() foo`, `* | stats by (_time:year offset 6.5h) count(*) as foo`)
f(`* | stats (_time:year offset 6.5h) count() foo`, `* | stats by (_time:year offset 6.5h) count(*) as foo`)
// stats pipe with per-func filters
f(`* | stats count() if (foo bar) rows`, `* | stats count(*) if (foo bar) as rows`)
f(`* | stats by (_time:1d offset -2h, f2)
count() if (is_admin:true or _msg:"foo bar"*) as foo,
sum(duration) if (host:in('foo.com', 'bar.com') and path:/foobar) as bar`,
`* | stats by (_time:1d offset -2h, f2) count(*) if (is_admin:true or "foo bar"*) as foo, sum(duration) if (host:in(foo.com,bar.com) path:"/foobar") as bar`)
// sort pipe
f(`* | sort`, `* | sort`)
f(`* | sort desc`, `* | sort desc`)

View file

@ -80,9 +80,12 @@ func newPipeSortProcessor(ps *pipeSort, workersCount int, stopCh <-chan struct{}
shards := make([]pipeSortProcessorShard, workersCount)
for i := range shards {
shard := &shards[i]
shard.ps = ps
shard.stateSizeBudget = stateSizeBudgetChunk
shards[i] = pipeSortProcessorShard{
pipeSortProcessorShardNopad: pipeSortProcessorShardNopad{
ps: ps,
stateSizeBudget: stateSizeBudgetChunk,
},
}
maxStateSize -= stateSizeBudgetChunk
}

View file

@ -28,6 +28,12 @@ type pipeStatsFunc struct {
// f is stats function to execute
f statsFunc
// neededFieldsForFunc contains needed fields for f execution
neededFieldsForFunc []string
// iff is an additional filter, which is applied to results before executing f on them
iff filter
// resultName is the name of the output generated by f
resultName string
}
@ -36,12 +42,12 @@ type statsFunc interface {
// String returns string representation of statsFunc
String() string
// neededFields returns the needed fields for calculating the given stats
neededFields() []string
// updateNeededFields update neededFields with the fields needed for calculating the given stats
updateNeededFields(neededFields fieldsSet)
// newStatsProcessor must create new statsProcessor for calculating stats for the given statsFunc.
// newStatsProcessor must create new statsProcessor for calculating stats for the given statsFunc
//
// It also must return the size in bytes of the returned statsProcessor.
// It also must return the size in bytes of the returned statsProcessor
newStatsProcessor() (statsProcessor, int)
}
@ -82,7 +88,12 @@ func (ps *pipeStats) String() string {
}
a := make([]string, len(ps.funcs))
for i, f := range ps.funcs {
a[i] = f.f.String() + " as " + quoteTokenIfNeeded(f.resultName)
line := f.f.String()
if f.iff != nil {
line += " if (" + f.iff.String() + ")"
}
line += " as " + quoteTokenIfNeeded(f.resultName)
a[i] = line
}
s += strings.Join(a, ", ")
return s
@ -97,10 +108,12 @@ func (ps *pipeStats) updateNeededFields(neededFields, unneededFields fieldsSet)
neededFields.add(bf.name)
}
for i, f := range ps.funcs {
for _, f := range ps.funcs {
if neededFieldsOrig.contains(f.resultName) && !unneededFields.contains(f.resultName) {
funcFields := ps.funcs[i].f.neededFields()
neededFields.addAll(funcFields)
f.f.updateNeededFields(neededFields)
if f.iff != nil {
f.iff.updateNeededFields(neededFields)
}
}
}
@ -113,11 +126,21 @@ func (ps *pipeStats) newPipeProcessor(workersCount int, stopCh <-chan struct{},
maxStateSize := int64(float64(memory.Allowed()) * 0.3)
shards := make([]pipeStatsProcessorShard, workersCount)
funcsLen := len(ps.funcs)
for i := range shards {
shard := &shards[i]
shard.ps = ps
shard.m = make(map[string]*pipeStatsGroup)
shard.stateSizeBudget = stateSizeBudgetChunk
shards[i] = pipeStatsProcessorShard{
pipeStatsProcessorShardNopad: pipeStatsProcessorShardNopad{
ps: ps,
m: make(map[string]*pipeStatsGroup),
bms: make([]bitmap, funcsLen),
brs: make([]*blockResult, funcsLen),
brsBuf: make([]blockResult, funcsLen),
stateSizeBudget: stateSizeBudgetChunk,
},
}
maxStateSize -= stateSizeBudgetChunk
}
@ -157,7 +180,13 @@ type pipeStatsProcessorShard struct {
type pipeStatsProcessorShardNopad struct {
ps *pipeStats
m map[string]*pipeStatsGroup
m map[string]*pipeStatsGroup
// bms, brs and brsBuf are used for applying per-func filters.
bms []bitmap
brs []*blockResult
brsBuf []blockResult
columnValues [][]string
keyBuf []byte
@ -168,10 +197,14 @@ type pipeStatsProcessorShardNopad struct {
func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
byFields := shard.ps.byFields
// Apply per-function filters
brs := shard.applyPerFunctionFilters(br)
// Process stats for the defined functions
if len(byFields) == 0 {
// Fast path - pass all the rows to a single group with empty key.
psg := shard.getPipeStatsGroup(nil)
shard.stateSizeBudget -= psg.updateStatsForAllRows(br)
shard.stateSizeBudget -= psg.updateStatsForAllRows(brs)
return
}
if len(byFields) == 1 {
@ -183,7 +216,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
v := br.getBucketedValue(c.valuesEncoded[0], bf)
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(v))
psg := shard.getPipeStatsGroup(shard.keyBuf)
shard.stateSizeBudget -= psg.updateStatsForAllRows(br)
shard.stateSizeBudget -= psg.updateStatsForAllRows(brs)
return
}
@ -192,7 +225,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
// Fast path for column with constant values.
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0]))
psg := shard.getPipeStatsGroup(shard.keyBuf)
shard.stateSizeBudget -= psg.updateStatsForAllRows(br)
shard.stateSizeBudget -= psg.updateStatsForAllRows(brs)
return
}
@ -204,7 +237,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
keyBuf = encoding.MarshalBytes(keyBuf[:0], bytesutil.ToUnsafeBytes(values[i]))
psg = shard.getPipeStatsGroup(keyBuf)
}
shard.stateSizeBudget -= psg.updateStatsForRow(br, i)
shard.stateSizeBudget -= psg.updateStatsForRow(brs, i)
}
shard.keyBuf = keyBuf
return
@ -234,7 +267,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[0]))
}
psg := shard.getPipeStatsGroup(keyBuf)
shard.stateSizeBudget -= psg.updateStatsForAllRows(br)
shard.stateSizeBudget -= psg.updateStatsForAllRows(brs)
shard.keyBuf = keyBuf
return
}
@ -259,11 +292,40 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
}
psg = shard.getPipeStatsGroup(keyBuf)
}
shard.stateSizeBudget -= psg.updateStatsForRow(br, i)
shard.stateSizeBudget -= psg.updateStatsForRow(brs, i)
}
shard.keyBuf = keyBuf
}
func (shard *pipeStatsProcessorShard) applyPerFunctionFilters(brSrc *blockResult) []*blockResult {
funcs := shard.ps.funcs
brs := shard.brs
for i := range funcs {
iff := funcs[i].iff
if iff == nil {
// Fast path - there are no per-function filters
brs[i] = brSrc
continue
}
bm := &shard.bms[i]
bm.init(len(brSrc.timestamps))
bm.setBits()
iff.applyToBlockResult(brSrc, bm)
if bm.areAllBitsSet() {
// Fast path - per-function filter doesn't filter out rows
brs[i] = brSrc
continue
}
// Store the remaining rows for the needed per-func fields to brDst
brDst := &shard.brsBuf[i]
brDst.initFromNeededColumns(brSrc, bm, funcs[i].neededFieldsForFunc)
brs[i] = brDst
}
return brs
}
func (shard *pipeStatsProcessorShard) getPipeStatsGroup(key []byte) *pipeStatsGroup {
psg := shard.m[string(key)]
if psg != nil {
@ -289,18 +351,18 @@ type pipeStatsGroup struct {
sfps []statsProcessor
}
func (psg *pipeStatsGroup) updateStatsForAllRows(br *blockResult) int {
func (psg *pipeStatsGroup) updateStatsForAllRows(brs []*blockResult) int {
n := 0
for _, sfp := range psg.sfps {
n += sfp.updateStatsForAllRows(br)
for i, sfp := range psg.sfps {
n += sfp.updateStatsForAllRows(brs[i])
}
return n
}
func (psg *pipeStatsGroup) updateStatsForRow(br *blockResult, rowIdx int) int {
func (psg *pipeStatsGroup) updateStatsForRow(brs []*blockResult, rowIdx int) int {
n := 0
for _, sfp := range psg.sfps {
n += sfp.updateStatsForRow(br, rowIdx)
for i, sfp := range psg.sfps {
n += sfp.updateStatsForRow(brs[i], rowIdx)
}
return n
}
@ -454,27 +516,32 @@ func parsePipeStats(lex *lexer) (*pipeStats, error) {
var funcs []pipeStatsFunc
for {
var f pipeStatsFunc
sf, err := parseStatsFunc(lex)
if err != nil {
return nil, err
}
/*
if lex.isKeyword("if") {
ifQuery, err := parseIfQuery(lex)
if err != nil {
return fmt.Errorf("cannot parse 'if' query for %s: %w", sf, err)
}
f.f = sf
neededFields := newFieldsSet()
f.f.updateNeededFields(neededFields)
f.neededFieldsForFunc = neededFields.getAll()
if lex.isKeyword("if") {
iff, err := parseIfFilter(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'if' filter for %s: %w", sf, err)
}
*/
f.iff = iff
}
resultName, err := parseResultName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name for %s: %w", sf, err)
}
f.resultName = resultName
funcs = append(funcs, pipeStatsFunc{
f: sf,
resultName: resultName,
})
funcs = append(funcs, f)
if lex.isKeyword("|", ")", "") {
ps.funcs = funcs
@ -487,6 +554,26 @@ func parsePipeStats(lex *lexer) (*pipeStats, error) {
}
}
func parseIfFilter(lex *lexer) (filter, error) {
if !lex.isKeyword("if") {
return nil, fmt.Errorf("unexpected keyword %q; expecting 'if'", lex.token)
}
lex.nextToken()
if !lex.isKeyword("(") {
return nil, fmt.Errorf("unexpected token %q after 'if'; expecting '('", lex.token)
}
lex.nextToken()
f, err := parseFilter(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'if' filter: %w", err)
}
if !lex.isKeyword(")") {
return nil, fmt.Errorf("unexpected token %q after 'if' filter; expecting ')'", lex.token)
}
lex.nextToken()
return f, nil
}
func parseStatsFunc(lex *lexer) (statsFunc, error) {
switch {
case lex.isKeyword("count"):

View file

@ -18,9 +18,12 @@ func newPipeTopkProcessor(ps *pipeSort, workersCount int, stopCh <-chan struct{}
shards := make([]pipeTopkProcessorShard, workersCount)
for i := range shards {
shard := &shards[i]
shard.ps = ps
shard.stateSizeBudget = stateSizeBudgetChunk
shards[i] = pipeTopkProcessorShard{
pipeTopkProcessorShardNopad: pipeTopkProcessorShardNopad{
ps: ps,
stateSizeBudget: stateSizeBudgetChunk,
},
}
maxStateSize -= stateSizeBudgetChunk
}

View file

@ -49,10 +49,13 @@ func (pu *pipeUniq) newPipeProcessor(workersCount int, stopCh <-chan struct{}, c
shards := make([]pipeUniqProcessorShard, workersCount)
for i := range shards {
shard := &shards[i]
shard.pu = pu
shard.m = make(map[string]struct{})
shard.stateSizeBudget = stateSizeBudgetChunk
shards[i] = pipeUniqProcessorShard{
pipeUniqProcessorShardNopad: pipeUniqProcessorShardNopad{
pu: pu,
m: make(map[string]struct{}),
stateSizeBudget: stateSizeBudgetChunk,
},
}
maxStateSize -= stateSizeBudgetChunk
}

View file

@ -15,8 +15,8 @@ func (sa *statsAvg) String() string {
return "avg(" + fieldNamesString(sa.fields) + ")"
}
func (sa *statsAvg) neededFields() []string {
return sa.fields
func (sa *statsAvg) updateNeededFields(neededFields fieldsSet) {
neededFields.addAll(sa.fields)
}
func (sa *statsAvg) newStatsProcessor() (statsProcessor, int) {

View file

@ -17,12 +17,12 @@ func (sc *statsCount) String() string {
return "count(" + fieldNamesString(sc.fields) + ")"
}
func (sc *statsCount) neededFields() []string {
func (sc *statsCount) updateNeededFields(neededFields fieldsSet) {
if sc.containsStar {
// There is no need in fetching any columns for count(*) - the number of matching rows can be calculated as len(blockResult.timestamps)
return nil
return
}
return sc.fields
neededFields.addAll(sc.fields)
}
func (sc *statsCount) newStatsProcessor() (statsProcessor, int) {

View file

@ -17,8 +17,8 @@ func (sc *statsCountEmpty) String() string {
return "count_empty(" + fieldNamesString(sc.fields) + ")"
}
func (sc *statsCountEmpty) neededFields() []string {
return sc.fields
func (sc *statsCountEmpty) updateNeededFields(neededFields fieldsSet) {
neededFields.addAll(sc.fields)
}
func (sc *statsCountEmpty) newStatsProcessor() (statsProcessor, int) {

View file

@ -24,8 +24,8 @@ func (su *statsCountUniq) String() string {
return s
}
func (su *statsCountUniq) neededFields() []string {
return su.fields
func (su *statsCountUniq) updateNeededFields(neededFields fieldsSet) {
neededFields.addAll(su.fields)
}
func (su *statsCountUniq) newStatsProcessor() (statsProcessor, int) {

View file

@ -19,8 +19,8 @@ func (sm *statsMax) String() string {
return "max(" + fieldNamesString(sm.fields) + ")"
}
func (sm *statsMax) neededFields() []string {
return sm.fields
func (sm *statsMax) updateNeededFields(neededFields fieldsSet) {
neededFields.addAll(sm.fields)
}
func (sm *statsMax) newStatsProcessor() (statsProcessor, int) {
@ -124,23 +124,23 @@ func (smp *statsMaxProcessor) updateStateForColumn(br *blockResult, c *blockResu
}
case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64:
bb := bbPool.Get()
bb.B = marshalUint64String(bb.B[:0], c.ch.maxValue)
bb.B = marshalUint64String(bb.B[:0], c.maxValue)
smp.updateStateBytes(bb.B)
bbPool.Put(bb)
case valueTypeFloat64:
f := math.Float64frombits(c.ch.maxValue)
f := math.Float64frombits(c.maxValue)
bb := bbPool.Get()
bb.B = marshalFloat64String(bb.B[:0], f)
smp.updateStateBytes(bb.B)
bbPool.Put(bb)
case valueTypeIPv4:
bb := bbPool.Get()
bb.B = marshalIPv4String(bb.B[:0], uint32(c.ch.maxValue))
bb.B = marshalIPv4String(bb.B[:0], uint32(c.maxValue))
smp.updateStateBytes(bb.B)
bbPool.Put(bb)
case valueTypeTimestampISO8601:
bb := bbPool.Get()
bb.B = marshalTimestampISO8601String(bb.B[:0], int64(c.ch.maxValue))
bb.B = marshalTimestampISO8601String(bb.B[:0], int64(c.maxValue))
smp.updateStateBytes(bb.B)
bbPool.Put(bb)
default:

View file

@ -14,8 +14,8 @@ func (sm *statsMedian) String() string {
return "median(" + fieldNamesString(sm.fields) + ")"
}
func (sm *statsMedian) neededFields() []string {
return sm.fields
func (sm *statsMedian) updateNeededFields(neededFields fieldsSet) {
neededFields.addAll(sm.fields)
}
func (sm *statsMedian) newStatsProcessor() (statsProcessor, int) {

View file

@ -19,8 +19,8 @@ func (sm *statsMin) String() string {
return "min(" + fieldNamesString(sm.fields) + ")"
}
func (sm *statsMin) neededFields() []string {
return sm.fields
func (sm *statsMin) updateNeededFields(neededFields fieldsSet) {
neededFields.addAll(sm.fields)
}
func (sm *statsMin) newStatsProcessor() (statsProcessor, int) {
@ -124,23 +124,23 @@ func (smp *statsMinProcessor) updateStateForColumn(br *blockResult, c *blockResu
}
case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64:
bb := bbPool.Get()
bb.B = marshalUint64String(bb.B[:0], c.ch.minValue)
bb.B = marshalUint64String(bb.B[:0], c.minValue)
smp.updateStateBytes(bb.B)
bbPool.Put(bb)
case valueTypeFloat64:
f := math.Float64frombits(c.ch.minValue)
f := math.Float64frombits(c.minValue)
bb := bbPool.Get()
bb.B = marshalFloat64String(bb.B[:0], f)
smp.updateStateBytes(bb.B)
bbPool.Put(bb)
case valueTypeIPv4:
bb := bbPool.Get()
bb.B = marshalIPv4String(bb.B[:0], uint32(c.ch.minValue))
bb.B = marshalIPv4String(bb.B[:0], uint32(c.minValue))
smp.updateStateBytes(bb.B)
bbPool.Put(bb)
case valueTypeTimestampISO8601:
bb := bbPool.Get()
bb.B = marshalTimestampISO8601String(bb.B[:0], int64(c.ch.minValue))
bb.B = marshalTimestampISO8601String(bb.B[:0], int64(c.minValue))
smp.updateStateBytes(bb.B)
bbPool.Put(bb)
default:

View file

@ -24,8 +24,8 @@ func (sq *statsQuantile) String() string {
return fmt.Sprintf("quantile(%g, %s)", sq.phi, fieldNamesString(sq.fields))
}
func (sq *statsQuantile) neededFields() []string {
return sq.fields
func (sq *statsQuantile) updateNeededFields(neededFields fieldsSet) {
neededFields.addAll(sq.fields)
}
func (sq *statsQuantile) newStatsProcessor() (statsProcessor, int) {

View file

@ -16,8 +16,8 @@ func (ss *statsSum) String() string {
return "sum(" + fieldNamesString(ss.fields) + ")"
}
func (ss *statsSum) neededFields() []string {
return ss.fields
func (ss *statsSum) updateNeededFields(neededFields fieldsSet) {
neededFields.addAll(ss.fields)
}
func (ss *statsSum) newStatsProcessor() (statsProcessor, int) {

View file

@ -15,8 +15,8 @@ func (ss *statsSumLen) String() string {
return "sum_len(" + fieldNamesString(ss.fields) + ")"
}
func (ss *statsSumLen) neededFields() []string {
return ss.fields
func (ss *statsSumLen) updateNeededFields(neededFields fieldsSet) {
neededFields.addAll(ss.fields)
}
func (ss *statsSumLen) newStatsProcessor() (statsProcessor, int) {

View file

@ -24,8 +24,8 @@ func (su *statsUniqValues) String() string {
return s
}
func (su *statsUniqValues) neededFields() []string {
return su.fields
func (su *statsUniqValues) updateNeededFields(neededFields fieldsSet) {
neededFields.addAll(su.fields)
}
func (su *statsUniqValues) newStatsProcessor() (statsProcessor, int) {

View file

@ -21,8 +21,8 @@ func (sv *statsValues) String() string {
return s
}
func (sv *statsValues) neededFields() []string {
return sv.fields
func (sv *statsValues) updateNeededFields(neededFields fieldsSet) {
neededFields.addAll(sv.fields)
}
func (sv *statsValues) newStatsProcessor() (statsProcessor, int) {

View file

@ -470,9 +470,6 @@ func TestStorageSearch(t *testing.T) {
}
var rowsCountTotal atomic.Uint32
processBlock := func(_ uint, br *blockResult) {
if !br.streamID.tenantID.equal(&tenantID) {
panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID))
}
rowsCountTotal.Add(uint32(len(br.timestamps)))
}
s.search(workersCount, so, nil, processBlock)
@ -535,9 +532,6 @@ func TestStorageSearch(t *testing.T) {
}
var rowsCountTotal atomic.Uint32
processBlock := func(_ uint, br *blockResult) {
if !br.streamID.tenantID.equal(&tenantID) {
panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID))
}
rowsCountTotal.Add(uint32(len(br.timestamps)))
}
s.search(workersCount, so, nil, processBlock)
@ -564,9 +558,6 @@ func TestStorageSearch(t *testing.T) {
}
var rowsCountTotal atomic.Uint32
processBlock := func(_ uint, br *blockResult) {
if !br.streamID.tenantID.equal(&tenantID) {
panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID))
}
rowsCountTotal.Add(uint32(len(br.timestamps)))
}
s.search(workersCount, so, nil, processBlock)
@ -601,9 +592,6 @@ func TestStorageSearch(t *testing.T) {
}
var rowsCountTotal atomic.Uint32
processBlock := func(_ uint, br *blockResult) {
if !br.streamID.tenantID.equal(&tenantID) {
panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID))
}
rowsCountTotal.Add(uint32(len(br.timestamps)))
}
s.search(workersCount, so, nil, processBlock)

View file

@ -3,6 +3,7 @@ package logstorage
import (
"strconv"
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
@ -14,6 +15,29 @@ type StreamFilter struct {
orFilters []*andStreamFilter
}
func (sf *StreamFilter) matchStreamName(s string) bool {
sn := getStreamName()
defer putStreamName(sn)
if !sn.parse(s) {
return false
}
for _, of := range sf.orFilters {
matchAndFilters := true
for _, tf := range of.tagFilters {
if !sn.match(tf) {
matchAndFilters = false
break
}
}
if matchAndFilters {
return true
}
}
return false
}
func (sf *StreamFilter) isEmpty() bool {
for _, af := range sf.orFilters {
if len(af.tagFilters) > 0 {
@ -69,10 +93,96 @@ type streamTagFilter struct {
regexp *regexutil.PromRegex
}
func (tf *streamTagFilter) getRegexp() *regexutil.PromRegex {
return tf.regexp
}
func (tf *streamTagFilter) String() string {
return quoteTokenIfNeeded(tf.tagName) + tf.op + strconv.Quote(tf.value)
}
func getStreamName() *streamName {
v := streamNamePool.Get()
if v == nil {
return &streamName{}
}
return v.(*streamName)
}
func putStreamName(sn *streamName) {
sn.reset()
streamNamePool.Put(sn)
}
var streamNamePool sync.Pool
type streamName struct {
tags []Field
}
func (sn *streamName) reset() {
clear(sn.tags)
sn.tags = sn.tags[:0]
}
func (sn *streamName) parse(s string) bool {
if len(s) < 2 || s[0] != '{' || s[len(s)-1] != '}' {
return false
}
s = s[1 : len(s)-1]
if len(s) == 0 {
return true
}
for {
// Parse tag name
n := strings.IndexByte(s, '=')
if n < 0 {
// cannot find tag name
return false
}
name := s[:n]
s = s[n+1:]
// Parse tag value
if len(s) == 0 || s[0] != '"' {
return false
}
qPrefix, err := strconv.QuotedPrefix(s)
if err != nil {
return false
}
s = s[len(qPrefix):]
value, err := strconv.Unquote(qPrefix)
if err != nil {
return false
}
sn.tags = append(sn.tags, Field{
Name: name,
Value: value,
})
if len(s) == 0 {
return true
}
if s[0] != ',' {
return false
}
}
}
func (sn *streamName) match(tf *streamTagFilter) bool {
for _, t := range sn.tags {
if t.Name != tf.tagName {
continue
}
switch tf.op {
case "=":
return t.Value == tf.value
case "!=":
return t.Value != tf.value
case "=~":
return tf.regexp.MatchString(t.Value)
case "!~":
return !tf.regexp.MatchString(t.Value)
}
}
return false
}

View file

@ -2004,7 +2004,7 @@ func createAllIndexesForMetricName(is *indexSearch, mn *MetricName, tsid *TSID,
}
func (s *Storage) putSeriesToCache(metricNameRaw []byte, genTSID *generationTSID, date uint64) {
// Store the TSID for for the current indexdb into cache,
// Store the TSID for the current indexdb into cache,
// so future rows for that TSID are ingested via fast path.
s.putTSIDToCache(genTSID, metricNameRaw)