From f7dad8bd610c43452d2bd8a2a1eb6ef4714e9572 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 29 Apr 2024 08:05:53 +0200 Subject: [PATCH] wip --- lib/logstorage/filter.go | 53 +----------------- lib/logstorage/filter_stream.go | 53 ++++++++++++++++++ lib/logstorage/filter_test.go | 81 --------------------------- lib/logstorage/filter_time_test.go | 49 ++++++++++++++++ lib/logstorage/indexdb.go | 12 ++-- lib/logstorage/indexdb_test.go | 4 +- lib/logstorage/parser.go | 12 ++-- lib/logstorage/partition_test.go | 6 +- lib/logstorage/storage.go | 12 ++-- lib/logstorage/storage_search.go | 12 ++-- lib/logstorage/storage_search_test.go | 2 +- 11 files changed, 133 insertions(+), 163 deletions(-) create mode 100644 lib/logstorage/filter_stream.go diff --git a/lib/logstorage/filter.go b/lib/logstorage/filter.go index 20e62a32d..809c17893 100644 --- a/lib/logstorage/filter.go +++ b/lib/logstorage/filter.go @@ -1,9 +1,6 @@ package logstorage -import ( - "sync" -) - +// filter must implement filtering for log entries. type filter interface { // String returns string representation of the filter String() string @@ -11,51 +8,3 @@ type filter interface { // apply must update bm according to the filter applied to the given bs block apply(bs *blockSearch, bm *bitmap) } - -// streamFilter is the filter for `_stream:{...}` -type streamFilter struct { - // f is the filter to apply - f *StreamFilter - - // tenantIDs is the list of tenantIDs to search for streamIDs. - tenantIDs []TenantID - - // idb is the indexdb to search for streamIDs. - idb *indexdb - - streamIDsOnce sync.Once - streamIDs map[streamID]struct{} -} - -func (fs *streamFilter) String() string { - s := fs.f.String() - if s == "{}" { - return "" - } - return "_stream:" + s -} - -func (fs *streamFilter) getStreamIDs() map[streamID]struct{} { - fs.streamIDsOnce.Do(fs.initStreamIDs) - return fs.streamIDs -} - -func (fs *streamFilter) initStreamIDs() { - streamIDs := fs.idb.searchStreamIDs(fs.tenantIDs, fs.f) - m := make(map[streamID]struct{}, len(streamIDs)) - for i := range streamIDs { - m[streamIDs[i]] = struct{}{} - } - fs.streamIDs = m -} - -func (fs *streamFilter) apply(bs *blockSearch, bm *bitmap) { - if fs.f.isEmpty() { - return - } - streamIDs := fs.getStreamIDs() - if _, ok := streamIDs[bs.bsw.bh.streamID]; !ok { - bm.resetBits() - return - } -} diff --git a/lib/logstorage/filter_stream.go b/lib/logstorage/filter_stream.go new file mode 100644 index 000000000..9618dc3a2 --- /dev/null +++ b/lib/logstorage/filter_stream.go @@ -0,0 +1,53 @@ +package logstorage + +import ( + "sync" +) + +// filterStream is the filter for `_stream:{...}` +type filterStream struct { + // f is the filter to apply + f *StreamFilter + + // tenantIDs is the list of tenantIDs to search for streamIDs. + tenantIDs []TenantID + + // idb is the indexdb to search for streamIDs. + idb *indexdb + + streamIDsOnce sync.Once + streamIDs map[streamID]struct{} +} + +func (fs *filterStream) String() string { + s := fs.f.String() + if s == "{}" { + return "" + } + return "_stream:" + s +} + +func (fs *filterStream) getStreamIDs() map[streamID]struct{} { + fs.streamIDsOnce.Do(fs.initStreamIDs) + return fs.streamIDs +} + +func (fs *filterStream) initStreamIDs() { + streamIDs := fs.idb.searchStreamIDs(fs.tenantIDs, fs.f) + m := make(map[streamID]struct{}, len(streamIDs)) + for i := range streamIDs { + m[streamIDs[i]] = struct{}{} + } + fs.streamIDs = m +} + +func (fs *filterStream) apply(bs *blockSearch, bm *bitmap) { + if fs.f.isEmpty() { + return + } + streamIDs := fs.getStreamIDs() + if _, ok := streamIDs[bs.bsw.bh.streamID]; !ok { + bm.resetBits() + return + } +} diff --git a/lib/logstorage/filter_test.go b/lib/logstorage/filter_test.go index 82a96964a..585099f5a 100644 --- a/lib/logstorage/filter_test.go +++ b/lib/logstorage/filter_test.go @@ -1,7 +1,6 @@ package logstorage import ( - "fmt" "reflect" "testing" @@ -152,72 +151,6 @@ func TestComplexFilters(t *testing.T) { testFilterMatchForColumns(t, columns, f, "foo", []int{1, 3, 6}) } -func TestStreamFilter(t *testing.T) { - columns := []column{ - { - name: "foo", - values: []string{ - "a foo", - "a foobar", - "aa abc a", - "ca afdf a,foobar baz", - "a fddf foobarbaz", - "", - "a foobar", - "a kjlkjf dfff", - "a ТЕСТЙЦУК НГКШ ", - "a !!,23.(!1)", - }, - }, - } - - // Match - f := &filterExact{ - fieldName: "job", - value: "foobar", - } - testFilterMatchForColumns(t, columns, f, "foo", []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) - - // Mismatch - f = &filterExact{ - fieldName: "job", - value: "abc", - } - testFilterMatchForColumns(t, columns, f, "foo", nil) -} - -func testFilterMatchForTimestamps(t *testing.T, timestamps []int64, f filter, expectedRowIdxs []int) { - t.Helper() - - // Create the test storage - const storagePath = "testFilterMatchForTimestamps" - cfg := &StorageConfig{} - s := MustOpenStorage(storagePath, cfg) - - // Generate rows - getValue := func(rowIdx int) string { - return fmt.Sprintf("some value for row %d", rowIdx) - } - tenantID := TenantID{ - AccountID: 123, - ProjectID: 456, - } - generateRowsFromTimestamps(s, tenantID, timestamps, getValue) - - expectedResults := make([]string, len(expectedRowIdxs)) - expectedTimestamps := make([]int64, len(expectedRowIdxs)) - for i, idx := range expectedRowIdxs { - expectedResults[i] = getValue(idx) - expectedTimestamps[i] = timestamps[idx] - } - - testFilterMatchForStorage(t, s, tenantID, f, "_msg", expectedResults, expectedTimestamps) - - // Close and delete the test storage - s.MustClose() - fs.MustRemoveAll(storagePath) -} - func testFilterMatchForColumns(t *testing.T, columns []column, f filter, resultColumnName string, expectedRowIdxs []int) { t.Helper() @@ -317,17 +250,3 @@ func generateRowsFromColumns(s *Storage, tenantID TenantID, columns []column) { s.MustAddRows(lr) PutLogRows(lr) } - -func generateRowsFromTimestamps(s *Storage, tenantID TenantID, timestamps []int64, getValue func(rowIdx int) string) { - lr := GetLogRows(nil, nil) - var fields []Field - for i, timestamp := range timestamps { - fields = append(fields[:0], Field{ - Name: "_msg", - Value: getValue(i), - }) - lr.MustAdd(tenantID, timestamp, fields) - } - s.MustAddRows(lr) - PutLogRows(lr) -} diff --git a/lib/logstorage/filter_time_test.go b/lib/logstorage/filter_time_test.go index 6ac1e20c5..c0a7d3b2c 100644 --- a/lib/logstorage/filter_time_test.go +++ b/lib/logstorage/filter_time_test.go @@ -1,7 +1,10 @@ package logstorage import ( + "fmt" "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" ) func TestFilterTime(t *testing.T) { @@ -81,3 +84,49 @@ func TestFilterTime(t *testing.T) { } testFilterMatchForTimestamps(t, timestamps, ft, nil) } + +func testFilterMatchForTimestamps(t *testing.T, timestamps []int64, f filter, expectedRowIdxs []int) { + t.Helper() + + // Create the test storage + const storagePath = "testFilterMatchForTimestamps" + cfg := &StorageConfig{} + s := MustOpenStorage(storagePath, cfg) + + // Generate rows + getValue := func(rowIdx int) string { + return fmt.Sprintf("some value for row %d", rowIdx) + } + tenantID := TenantID{ + AccountID: 123, + ProjectID: 456, + } + generateRowsFromTimestamps(s, tenantID, timestamps, getValue) + + expectedResults := make([]string, len(expectedRowIdxs)) + expectedTimestamps := make([]int64, len(expectedRowIdxs)) + for i, idx := range expectedRowIdxs { + expectedResults[i] = getValue(idx) + expectedTimestamps[i] = timestamps[idx] + } + + testFilterMatchForStorage(t, s, tenantID, f, "_msg", expectedResults, expectedTimestamps) + + // Close and delete the test storage + s.MustClose() + fs.MustRemoveAll(storagePath) +} + +func generateRowsFromTimestamps(s *Storage, tenantID TenantID, timestamps []int64, getValue func(rowIdx int) string) { + lr := GetLogRows(nil, nil) + var fields []Field + for i, timestamp := range timestamps { + fields = append(fields[:0], Field{ + Name: "_msg", + Value: getValue(i), + }) + lr.MustAdd(tenantID, timestamp, fields) + } + s.MustAddRows(lr) + PutLogRows(lr) +} diff --git a/lib/logstorage/indexdb.go b/lib/logstorage/indexdb.go index d71775258..f7c4c70b9 100644 --- a/lib/logstorage/indexdb.go +++ b/lib/logstorage/indexdb.go @@ -51,9 +51,9 @@ type indexdb struct { // streamsCreatedTotal is the number of log streams created since the indexdb intialization. streamsCreatedTotal atomic.Uint64 - // the generation of the streamFilterCache. + // the generation of the filterStreamCache. // It is updated each time new item is added to tb. - streamFilterCacheGeneration atomic.Uint32 + filterStreamCacheGeneration atomic.Uint32 // path is the path to indexdb path string @@ -482,11 +482,11 @@ func (idb *indexdb) mustRegisterStream(streamID *streamID, streamTagsCanonical [ func (idb *indexdb) invalidateStreamFilterCache() { // This function must be fast, since it is called each // time new indexdb entry is added. - idb.streamFilterCacheGeneration.Add(1) + idb.filterStreamCacheGeneration.Add(1) } func (idb *indexdb) marshalStreamFilterCacheKey(dst []byte, tenantIDs []TenantID, sf *StreamFilter) []byte { - dst = encoding.MarshalUint32(dst, idb.streamFilterCacheGeneration.Load()) + dst = encoding.MarshalUint32(dst, idb.filterStreamCacheGeneration.Load()) dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(idb.partitionName)) dst = encoding.MarshalVarUint64(dst, uint64(len(tenantIDs))) for i := range tenantIDs { @@ -499,7 +499,7 @@ func (idb *indexdb) marshalStreamFilterCacheKey(dst []byte, tenantIDs []TenantID func (idb *indexdb) loadStreamIDsFromCache(tenantIDs []TenantID, sf *StreamFilter) ([]streamID, bool) { bb := bbPool.Get() bb.B = idb.marshalStreamFilterCacheKey(bb.B[:0], tenantIDs, sf) - data := idb.s.streamFilterCache.GetBig(nil, bb.B) + data := idb.s.filterStreamCache.GetBig(nil, bb.B) bbPool.Put(bb) if len(data) == 0 { // Cache miss @@ -536,7 +536,7 @@ func (idb *indexdb) storeStreamIDsToCache(tenantIDs []TenantID, sf *StreamFilter // Store marshaled streamIDs to cache. bb := bbPool.Get() bb.B = idb.marshalStreamFilterCacheKey(bb.B[:0], tenantIDs, sf) - idb.s.streamFilterCache.SetBig(bb.B, b) + idb.s.filterStreamCache.SetBig(bb.B, b) bbPool.Put(bb) } diff --git a/lib/logstorage/indexdb_test.go b/lib/logstorage/indexdb_test.go index de92fda7c..2b3dbbfe8 100644 --- a/lib/logstorage/indexdb_test.go +++ b/lib/logstorage/indexdb_test.go @@ -48,9 +48,9 @@ func TestStorageSearchStreamIDs(t *testing.T) { } idb.debugFlush() - f := func(streamFilter string, expectedStreamIDs []streamID) { + f := func(filterStream string, expectedStreamIDs []streamID) { t.Helper() - sf := mustNewStreamFilter(streamFilter) + sf := mustNewStreamFilter(filterStream) if expectedStreamIDs == nil { expectedStreamIDs = []streamID{} } diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index b989c2314..dc93e1eab 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -434,7 +434,7 @@ func parseFilterForPhrase(lex *lexer, phrase, fieldName string) (filter, error) case "_time": return parseFilterTimeWithOffset(lex) case "_stream": - return parseStreamFilter(lex) + return parseFilterStream(lex) default: return parseGenericFilter(lex, fieldName) } @@ -983,7 +983,7 @@ func stripTimezoneSuffix(s string) string { return s[:len(s)-len(tz)] } -func parseStreamFilter(lex *lexer) (*streamFilter, error) { +func parseFilterStream(lex *lexer) (*filterStream, error) { if !lex.isKeyword("{") { return nil, fmt.Errorf("unexpected token %q instead of '{' in _stream filter", lex.token) } @@ -1000,12 +1000,12 @@ func parseStreamFilter(lex *lexer) (*streamFilter, error) { switch { case lex.isKeyword("}"): lex.nextToken() - sf := &streamFilter{ + fs := &filterStream{ f: &StreamFilter{ orFilters: filters, }, } - return sf, nil + return fs, nil case lex.isKeyword("or"): if !lex.mustNextToken() { return nil, fmt.Errorf("incomplete _stream filter after 'or'") @@ -1024,11 +1024,11 @@ func newStreamFilter(s string) (*StreamFilter, error) { if !lex.mustNextToken() { return nil, fmt.Errorf("missing '{' in _stream filter") } - sf, err := parseStreamFilter(lex) + fs, err := parseFilterStream(lex) if err != nil { return nil, err } - return sf.f, nil + return fs.f, nil } func parseAndStreamFilter(lex *lexer) (*andStreamFilter, error) { diff --git a/lib/logstorage/partition_test.go b/lib/logstorage/partition_test.go index 2bbee20ba..ea1af27a5 100644 --- a/lib/logstorage/partition_test.go +++ b/lib/logstorage/partition_test.go @@ -172,16 +172,16 @@ func TestPartitionMustAddRowsConcurrent(t *testing.T) { // When the storage is no longer needed, closeTestStorage() must be called. func newTestStorage() *Storage { streamIDCache := workingsetcache.New(1024 * 1024) - streamFilterCache := workingsetcache.New(1024 * 1024) + filterStreamCache := workingsetcache.New(1024 * 1024) return &Storage{ flushInterval: time.Second, streamIDCache: streamIDCache, - streamFilterCache: streamFilterCache, + filterStreamCache: filterStreamCache, } } // closeTestStorage closes storage created via newTestStorage(). func closeTestStorage(s *Storage) { s.streamIDCache.Stop() - s.streamFilterCache.Stop() + s.filterStreamCache.Stop() } diff --git a/lib/logstorage/storage.go b/lib/logstorage/storage.go index a5ce10739..139ed1e38 100644 --- a/lib/logstorage/storage.go +++ b/lib/logstorage/storage.go @@ -133,10 +133,10 @@ type Storage struct { // when StreamTags must be found for the particular streamID streamTagsCache *workingsetcache.Cache - // streamFilterCache caches streamIDs keyed by (partition, []TenanID, StreamFilter). + // filterStreamCache caches streamIDs keyed by (partition, []TenanID, StreamFilter). // // It reduces the load on persistent storage during querying by _stream:{...} filter. - streamFilterCache *workingsetcache.Cache + filterStreamCache *workingsetcache.Cache } type partitionWrapper struct { @@ -244,7 +244,7 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage { streamTagsCache := workingsetcache.New(mem / 10) - streamFilterCache := workingsetcache.New(mem / 10) + filterStreamCache := workingsetcache.New(mem / 10) s := &Storage{ path: path, @@ -259,7 +259,7 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage { streamIDCache: streamIDCache, streamTagsCache: streamTagsCache, - streamFilterCache: streamFilterCache, + filterStreamCache: filterStreamCache, } partitionsPath := filepath.Join(path, partitionsDirname) @@ -397,8 +397,8 @@ func (s *Storage) MustClose() { s.streamTagsCache.Stop() s.streamTagsCache = nil - s.streamFilterCache.Stop() - s.streamFilterCache = nil + s.filterStreamCache.Stop() + s.filterStreamCache = nil // release lock file fs.MustClose(s.flockF) diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index e94b46972..d70dcfc32 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -268,7 +268,7 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch } s.partitionsLock.Unlock() - // Obtain common streamFilter from f + // Obtain common filterStream from f var sf *StreamFilter sf, f = getCommonStreamFilter(f) @@ -345,7 +345,7 @@ func hasStreamFilters(f filter) bool { return hasStreamFiltersInList(t.filters) case *filterNot: return hasStreamFilters(t.f) - case *streamFilter: + case *filterStream: return true default: return false @@ -375,8 +375,8 @@ func initStreamFilters(tenantIDs []TenantID, idb *indexdb, f filter) filter { return &filterNot{ f: initStreamFilters(tenantIDs, idb, t.f), } - case *streamFilter: - return &streamFilter{ + case *filterStream: + return &filterStream{ f: t.f, tenantIDs: tenantIDs, idb: idb, @@ -698,7 +698,7 @@ func getCommonStreamFilter(f filter) (*StreamFilter, filter) { case *filterAnd: filters := t.filters for i, filter := range filters { - sf, ok := filter.(*streamFilter) + sf, ok := filter.(*filterStream) if ok && !sf.f.isEmpty() { // Remove sf from filters, since it doesn't filter out anything then. fa := &filterAnd{ @@ -707,7 +707,7 @@ func getCommonStreamFilter(f filter) (*StreamFilter, filter) { return sf.f, fa } } - case *streamFilter: + case *filterStream: return t.f, &filterNoop{} } return nil, f diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index 6c11fb29d..349b3b2f6 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -384,7 +384,7 @@ func TestStorageSearch(t *testing.T) { maxTimestamp: maxTimestamp, }) if sf != nil { - filters = append(filters, &streamFilter{ + filters = append(filters, &filterStream{ f: sf, }) }