mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
wip
This commit is contained in:
parent
0d0804a202
commit
f7dad8bd61
11 changed files with 133 additions and 163 deletions
|
@ -1,9 +1,6 @@
|
|||
package logstorage
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// filter must implement filtering for log entries.
|
||||
type filter interface {
|
||||
// String returns string representation of the filter
|
||||
String() string
|
||||
|
@ -11,51 +8,3 @@ type filter interface {
|
|||
// apply must update bm according to the filter applied to the given bs block
|
||||
apply(bs *blockSearch, bm *bitmap)
|
||||
}
|
||||
|
||||
// streamFilter is the filter for `_stream:{...}`
|
||||
type streamFilter struct {
|
||||
// f is the filter to apply
|
||||
f *StreamFilter
|
||||
|
||||
// tenantIDs is the list of tenantIDs to search for streamIDs.
|
||||
tenantIDs []TenantID
|
||||
|
||||
// idb is the indexdb to search for streamIDs.
|
||||
idb *indexdb
|
||||
|
||||
streamIDsOnce sync.Once
|
||||
streamIDs map[streamID]struct{}
|
||||
}
|
||||
|
||||
func (fs *streamFilter) String() string {
|
||||
s := fs.f.String()
|
||||
if s == "{}" {
|
||||
return ""
|
||||
}
|
||||
return "_stream:" + s
|
||||
}
|
||||
|
||||
func (fs *streamFilter) getStreamIDs() map[streamID]struct{} {
|
||||
fs.streamIDsOnce.Do(fs.initStreamIDs)
|
||||
return fs.streamIDs
|
||||
}
|
||||
|
||||
func (fs *streamFilter) initStreamIDs() {
|
||||
streamIDs := fs.idb.searchStreamIDs(fs.tenantIDs, fs.f)
|
||||
m := make(map[streamID]struct{}, len(streamIDs))
|
||||
for i := range streamIDs {
|
||||
m[streamIDs[i]] = struct{}{}
|
||||
}
|
||||
fs.streamIDs = m
|
||||
}
|
||||
|
||||
func (fs *streamFilter) apply(bs *blockSearch, bm *bitmap) {
|
||||
if fs.f.isEmpty() {
|
||||
return
|
||||
}
|
||||
streamIDs := fs.getStreamIDs()
|
||||
if _, ok := streamIDs[bs.bsw.bh.streamID]; !ok {
|
||||
bm.resetBits()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
53
lib/logstorage/filter_stream.go
Normal file
53
lib/logstorage/filter_stream.go
Normal file
|
@ -0,0 +1,53 @@
|
|||
package logstorage
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// filterStream is the filter for `_stream:{...}`
|
||||
type filterStream struct {
|
||||
// f is the filter to apply
|
||||
f *StreamFilter
|
||||
|
||||
// tenantIDs is the list of tenantIDs to search for streamIDs.
|
||||
tenantIDs []TenantID
|
||||
|
||||
// idb is the indexdb to search for streamIDs.
|
||||
idb *indexdb
|
||||
|
||||
streamIDsOnce sync.Once
|
||||
streamIDs map[streamID]struct{}
|
||||
}
|
||||
|
||||
func (fs *filterStream) String() string {
|
||||
s := fs.f.String()
|
||||
if s == "{}" {
|
||||
return ""
|
||||
}
|
||||
return "_stream:" + s
|
||||
}
|
||||
|
||||
func (fs *filterStream) getStreamIDs() map[streamID]struct{} {
|
||||
fs.streamIDsOnce.Do(fs.initStreamIDs)
|
||||
return fs.streamIDs
|
||||
}
|
||||
|
||||
func (fs *filterStream) initStreamIDs() {
|
||||
streamIDs := fs.idb.searchStreamIDs(fs.tenantIDs, fs.f)
|
||||
m := make(map[streamID]struct{}, len(streamIDs))
|
||||
for i := range streamIDs {
|
||||
m[streamIDs[i]] = struct{}{}
|
||||
}
|
||||
fs.streamIDs = m
|
||||
}
|
||||
|
||||
func (fs *filterStream) apply(bs *blockSearch, bm *bitmap) {
|
||||
if fs.f.isEmpty() {
|
||||
return
|
||||
}
|
||||
streamIDs := fs.getStreamIDs()
|
||||
if _, ok := streamIDs[bs.bsw.bh.streamID]; !ok {
|
||||
bm.resetBits()
|
||||
return
|
||||
}
|
||||
}
|
|
@ -1,7 +1,6 @@
|
|||
package logstorage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
|
@ -152,72 +151,6 @@ func TestComplexFilters(t *testing.T) {
|
|||
testFilterMatchForColumns(t, columns, f, "foo", []int{1, 3, 6})
|
||||
}
|
||||
|
||||
func TestStreamFilter(t *testing.T) {
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
values: []string{
|
||||
"a foo",
|
||||
"a foobar",
|
||||
"aa abc a",
|
||||
"ca afdf a,foobar baz",
|
||||
"a fddf foobarbaz",
|
||||
"",
|
||||
"a foobar",
|
||||
"a kjlkjf dfff",
|
||||
"a ТЕСТЙЦУК НГКШ ",
|
||||
"a !!,23.(!1)",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Match
|
||||
f := &filterExact{
|
||||
fieldName: "job",
|
||||
value: "foobar",
|
||||
}
|
||||
testFilterMatchForColumns(t, columns, f, "foo", []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
|
||||
|
||||
// Mismatch
|
||||
f = &filterExact{
|
||||
fieldName: "job",
|
||||
value: "abc",
|
||||
}
|
||||
testFilterMatchForColumns(t, columns, f, "foo", nil)
|
||||
}
|
||||
|
||||
func testFilterMatchForTimestamps(t *testing.T, timestamps []int64, f filter, expectedRowIdxs []int) {
|
||||
t.Helper()
|
||||
|
||||
// Create the test storage
|
||||
const storagePath = "testFilterMatchForTimestamps"
|
||||
cfg := &StorageConfig{}
|
||||
s := MustOpenStorage(storagePath, cfg)
|
||||
|
||||
// Generate rows
|
||||
getValue := func(rowIdx int) string {
|
||||
return fmt.Sprintf("some value for row %d", rowIdx)
|
||||
}
|
||||
tenantID := TenantID{
|
||||
AccountID: 123,
|
||||
ProjectID: 456,
|
||||
}
|
||||
generateRowsFromTimestamps(s, tenantID, timestamps, getValue)
|
||||
|
||||
expectedResults := make([]string, len(expectedRowIdxs))
|
||||
expectedTimestamps := make([]int64, len(expectedRowIdxs))
|
||||
for i, idx := range expectedRowIdxs {
|
||||
expectedResults[i] = getValue(idx)
|
||||
expectedTimestamps[i] = timestamps[idx]
|
||||
}
|
||||
|
||||
testFilterMatchForStorage(t, s, tenantID, f, "_msg", expectedResults, expectedTimestamps)
|
||||
|
||||
// Close and delete the test storage
|
||||
s.MustClose()
|
||||
fs.MustRemoveAll(storagePath)
|
||||
}
|
||||
|
||||
func testFilterMatchForColumns(t *testing.T, columns []column, f filter, resultColumnName string, expectedRowIdxs []int) {
|
||||
t.Helper()
|
||||
|
||||
|
@ -317,17 +250,3 @@ func generateRowsFromColumns(s *Storage, tenantID TenantID, columns []column) {
|
|||
s.MustAddRows(lr)
|
||||
PutLogRows(lr)
|
||||
}
|
||||
|
||||
func generateRowsFromTimestamps(s *Storage, tenantID TenantID, timestamps []int64, getValue func(rowIdx int) string) {
|
||||
lr := GetLogRows(nil, nil)
|
||||
var fields []Field
|
||||
for i, timestamp := range timestamps {
|
||||
fields = append(fields[:0], Field{
|
||||
Name: "_msg",
|
||||
Value: getValue(i),
|
||||
})
|
||||
lr.MustAdd(tenantID, timestamp, fields)
|
||||
}
|
||||
s.MustAddRows(lr)
|
||||
PutLogRows(lr)
|
||||
}
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
package logstorage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
)
|
||||
|
||||
func TestFilterTime(t *testing.T) {
|
||||
|
@ -81,3 +84,49 @@ func TestFilterTime(t *testing.T) {
|
|||
}
|
||||
testFilterMatchForTimestamps(t, timestamps, ft, nil)
|
||||
}
|
||||
|
||||
func testFilterMatchForTimestamps(t *testing.T, timestamps []int64, f filter, expectedRowIdxs []int) {
|
||||
t.Helper()
|
||||
|
||||
// Create the test storage
|
||||
const storagePath = "testFilterMatchForTimestamps"
|
||||
cfg := &StorageConfig{}
|
||||
s := MustOpenStorage(storagePath, cfg)
|
||||
|
||||
// Generate rows
|
||||
getValue := func(rowIdx int) string {
|
||||
return fmt.Sprintf("some value for row %d", rowIdx)
|
||||
}
|
||||
tenantID := TenantID{
|
||||
AccountID: 123,
|
||||
ProjectID: 456,
|
||||
}
|
||||
generateRowsFromTimestamps(s, tenantID, timestamps, getValue)
|
||||
|
||||
expectedResults := make([]string, len(expectedRowIdxs))
|
||||
expectedTimestamps := make([]int64, len(expectedRowIdxs))
|
||||
for i, idx := range expectedRowIdxs {
|
||||
expectedResults[i] = getValue(idx)
|
||||
expectedTimestamps[i] = timestamps[idx]
|
||||
}
|
||||
|
||||
testFilterMatchForStorage(t, s, tenantID, f, "_msg", expectedResults, expectedTimestamps)
|
||||
|
||||
// Close and delete the test storage
|
||||
s.MustClose()
|
||||
fs.MustRemoveAll(storagePath)
|
||||
}
|
||||
|
||||
func generateRowsFromTimestamps(s *Storage, tenantID TenantID, timestamps []int64, getValue func(rowIdx int) string) {
|
||||
lr := GetLogRows(nil, nil)
|
||||
var fields []Field
|
||||
for i, timestamp := range timestamps {
|
||||
fields = append(fields[:0], Field{
|
||||
Name: "_msg",
|
||||
Value: getValue(i),
|
||||
})
|
||||
lr.MustAdd(tenantID, timestamp, fields)
|
||||
}
|
||||
s.MustAddRows(lr)
|
||||
PutLogRows(lr)
|
||||
}
|
||||
|
|
|
@ -51,9 +51,9 @@ type indexdb struct {
|
|||
// streamsCreatedTotal is the number of log streams created since the indexdb intialization.
|
||||
streamsCreatedTotal atomic.Uint64
|
||||
|
||||
// the generation of the streamFilterCache.
|
||||
// the generation of the filterStreamCache.
|
||||
// It is updated each time new item is added to tb.
|
||||
streamFilterCacheGeneration atomic.Uint32
|
||||
filterStreamCacheGeneration atomic.Uint32
|
||||
|
||||
// path is the path to indexdb
|
||||
path string
|
||||
|
@ -482,11 +482,11 @@ func (idb *indexdb) mustRegisterStream(streamID *streamID, streamTagsCanonical [
|
|||
func (idb *indexdb) invalidateStreamFilterCache() {
|
||||
// This function must be fast, since it is called each
|
||||
// time new indexdb entry is added.
|
||||
idb.streamFilterCacheGeneration.Add(1)
|
||||
idb.filterStreamCacheGeneration.Add(1)
|
||||
}
|
||||
|
||||
func (idb *indexdb) marshalStreamFilterCacheKey(dst []byte, tenantIDs []TenantID, sf *StreamFilter) []byte {
|
||||
dst = encoding.MarshalUint32(dst, idb.streamFilterCacheGeneration.Load())
|
||||
dst = encoding.MarshalUint32(dst, idb.filterStreamCacheGeneration.Load())
|
||||
dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(idb.partitionName))
|
||||
dst = encoding.MarshalVarUint64(dst, uint64(len(tenantIDs)))
|
||||
for i := range tenantIDs {
|
||||
|
@ -499,7 +499,7 @@ func (idb *indexdb) marshalStreamFilterCacheKey(dst []byte, tenantIDs []TenantID
|
|||
func (idb *indexdb) loadStreamIDsFromCache(tenantIDs []TenantID, sf *StreamFilter) ([]streamID, bool) {
|
||||
bb := bbPool.Get()
|
||||
bb.B = idb.marshalStreamFilterCacheKey(bb.B[:0], tenantIDs, sf)
|
||||
data := idb.s.streamFilterCache.GetBig(nil, bb.B)
|
||||
data := idb.s.filterStreamCache.GetBig(nil, bb.B)
|
||||
bbPool.Put(bb)
|
||||
if len(data) == 0 {
|
||||
// Cache miss
|
||||
|
@ -536,7 +536,7 @@ func (idb *indexdb) storeStreamIDsToCache(tenantIDs []TenantID, sf *StreamFilter
|
|||
// Store marshaled streamIDs to cache.
|
||||
bb := bbPool.Get()
|
||||
bb.B = idb.marshalStreamFilterCacheKey(bb.B[:0], tenantIDs, sf)
|
||||
idb.s.streamFilterCache.SetBig(bb.B, b)
|
||||
idb.s.filterStreamCache.SetBig(bb.B, b)
|
||||
bbPool.Put(bb)
|
||||
}
|
||||
|
||||
|
|
|
@ -48,9 +48,9 @@ func TestStorageSearchStreamIDs(t *testing.T) {
|
|||
}
|
||||
idb.debugFlush()
|
||||
|
||||
f := func(streamFilter string, expectedStreamIDs []streamID) {
|
||||
f := func(filterStream string, expectedStreamIDs []streamID) {
|
||||
t.Helper()
|
||||
sf := mustNewStreamFilter(streamFilter)
|
||||
sf := mustNewStreamFilter(filterStream)
|
||||
if expectedStreamIDs == nil {
|
||||
expectedStreamIDs = []streamID{}
|
||||
}
|
||||
|
|
|
@ -434,7 +434,7 @@ func parseFilterForPhrase(lex *lexer, phrase, fieldName string) (filter, error)
|
|||
case "_time":
|
||||
return parseFilterTimeWithOffset(lex)
|
||||
case "_stream":
|
||||
return parseStreamFilter(lex)
|
||||
return parseFilterStream(lex)
|
||||
default:
|
||||
return parseGenericFilter(lex, fieldName)
|
||||
}
|
||||
|
@ -983,7 +983,7 @@ func stripTimezoneSuffix(s string) string {
|
|||
return s[:len(s)-len(tz)]
|
||||
}
|
||||
|
||||
func parseStreamFilter(lex *lexer) (*streamFilter, error) {
|
||||
func parseFilterStream(lex *lexer) (*filterStream, error) {
|
||||
if !lex.isKeyword("{") {
|
||||
return nil, fmt.Errorf("unexpected token %q instead of '{' in _stream filter", lex.token)
|
||||
}
|
||||
|
@ -1000,12 +1000,12 @@ func parseStreamFilter(lex *lexer) (*streamFilter, error) {
|
|||
switch {
|
||||
case lex.isKeyword("}"):
|
||||
lex.nextToken()
|
||||
sf := &streamFilter{
|
||||
fs := &filterStream{
|
||||
f: &StreamFilter{
|
||||
orFilters: filters,
|
||||
},
|
||||
}
|
||||
return sf, nil
|
||||
return fs, nil
|
||||
case lex.isKeyword("or"):
|
||||
if !lex.mustNextToken() {
|
||||
return nil, fmt.Errorf("incomplete _stream filter after 'or'")
|
||||
|
@ -1024,11 +1024,11 @@ func newStreamFilter(s string) (*StreamFilter, error) {
|
|||
if !lex.mustNextToken() {
|
||||
return nil, fmt.Errorf("missing '{' in _stream filter")
|
||||
}
|
||||
sf, err := parseStreamFilter(lex)
|
||||
fs, err := parseFilterStream(lex)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return sf.f, nil
|
||||
return fs.f, nil
|
||||
}
|
||||
|
||||
func parseAndStreamFilter(lex *lexer) (*andStreamFilter, error) {
|
||||
|
|
|
@ -172,16 +172,16 @@ func TestPartitionMustAddRowsConcurrent(t *testing.T) {
|
|||
// When the storage is no longer needed, closeTestStorage() must be called.
|
||||
func newTestStorage() *Storage {
|
||||
streamIDCache := workingsetcache.New(1024 * 1024)
|
||||
streamFilterCache := workingsetcache.New(1024 * 1024)
|
||||
filterStreamCache := workingsetcache.New(1024 * 1024)
|
||||
return &Storage{
|
||||
flushInterval: time.Second,
|
||||
streamIDCache: streamIDCache,
|
||||
streamFilterCache: streamFilterCache,
|
||||
filterStreamCache: filterStreamCache,
|
||||
}
|
||||
}
|
||||
|
||||
// closeTestStorage closes storage created via newTestStorage().
|
||||
func closeTestStorage(s *Storage) {
|
||||
s.streamIDCache.Stop()
|
||||
s.streamFilterCache.Stop()
|
||||
s.filterStreamCache.Stop()
|
||||
}
|
||||
|
|
|
@ -133,10 +133,10 @@ type Storage struct {
|
|||
// when StreamTags must be found for the particular streamID
|
||||
streamTagsCache *workingsetcache.Cache
|
||||
|
||||
// streamFilterCache caches streamIDs keyed by (partition, []TenanID, StreamFilter).
|
||||
// filterStreamCache caches streamIDs keyed by (partition, []TenanID, StreamFilter).
|
||||
//
|
||||
// It reduces the load on persistent storage during querying by _stream:{...} filter.
|
||||
streamFilterCache *workingsetcache.Cache
|
||||
filterStreamCache *workingsetcache.Cache
|
||||
}
|
||||
|
||||
type partitionWrapper struct {
|
||||
|
@ -244,7 +244,7 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage {
|
|||
|
||||
streamTagsCache := workingsetcache.New(mem / 10)
|
||||
|
||||
streamFilterCache := workingsetcache.New(mem / 10)
|
||||
filterStreamCache := workingsetcache.New(mem / 10)
|
||||
|
||||
s := &Storage{
|
||||
path: path,
|
||||
|
@ -259,7 +259,7 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage {
|
|||
|
||||
streamIDCache: streamIDCache,
|
||||
streamTagsCache: streamTagsCache,
|
||||
streamFilterCache: streamFilterCache,
|
||||
filterStreamCache: filterStreamCache,
|
||||
}
|
||||
|
||||
partitionsPath := filepath.Join(path, partitionsDirname)
|
||||
|
@ -397,8 +397,8 @@ func (s *Storage) MustClose() {
|
|||
s.streamTagsCache.Stop()
|
||||
s.streamTagsCache = nil
|
||||
|
||||
s.streamFilterCache.Stop()
|
||||
s.streamFilterCache = nil
|
||||
s.filterStreamCache.Stop()
|
||||
s.filterStreamCache = nil
|
||||
|
||||
// release lock file
|
||||
fs.MustClose(s.flockF)
|
||||
|
|
|
@ -268,7 +268,7 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch
|
|||
}
|
||||
s.partitionsLock.Unlock()
|
||||
|
||||
// Obtain common streamFilter from f
|
||||
// Obtain common filterStream from f
|
||||
var sf *StreamFilter
|
||||
sf, f = getCommonStreamFilter(f)
|
||||
|
||||
|
@ -345,7 +345,7 @@ func hasStreamFilters(f filter) bool {
|
|||
return hasStreamFiltersInList(t.filters)
|
||||
case *filterNot:
|
||||
return hasStreamFilters(t.f)
|
||||
case *streamFilter:
|
||||
case *filterStream:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
|
@ -375,8 +375,8 @@ func initStreamFilters(tenantIDs []TenantID, idb *indexdb, f filter) filter {
|
|||
return &filterNot{
|
||||
f: initStreamFilters(tenantIDs, idb, t.f),
|
||||
}
|
||||
case *streamFilter:
|
||||
return &streamFilter{
|
||||
case *filterStream:
|
||||
return &filterStream{
|
||||
f: t.f,
|
||||
tenantIDs: tenantIDs,
|
||||
idb: idb,
|
||||
|
@ -698,7 +698,7 @@ func getCommonStreamFilter(f filter) (*StreamFilter, filter) {
|
|||
case *filterAnd:
|
||||
filters := t.filters
|
||||
for i, filter := range filters {
|
||||
sf, ok := filter.(*streamFilter)
|
||||
sf, ok := filter.(*filterStream)
|
||||
if ok && !sf.f.isEmpty() {
|
||||
// Remove sf from filters, since it doesn't filter out anything then.
|
||||
fa := &filterAnd{
|
||||
|
@ -707,7 +707,7 @@ func getCommonStreamFilter(f filter) (*StreamFilter, filter) {
|
|||
return sf.f, fa
|
||||
}
|
||||
}
|
||||
case *streamFilter:
|
||||
case *filterStream:
|
||||
return t.f, &filterNoop{}
|
||||
}
|
||||
return nil, f
|
||||
|
|
|
@ -384,7 +384,7 @@ func TestStorageSearch(t *testing.T) {
|
|||
maxTimestamp: maxTimestamp,
|
||||
})
|
||||
if sf != nil {
|
||||
filters = append(filters, &streamFilter{
|
||||
filters = append(filters, &filterStream{
|
||||
f: sf,
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue