package logstorage import ( "bytes" "path/filepath" "sort" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) // PartitionStats contains stats for the partition. type PartitionStats struct { DatadbStats IndexdbStats } type partition struct { // s is the parent storage for the partition s *Storage // path is the path to the partition directory path string // name is the partition name. It is basically the directory name obtained from path. // It is used for creating keys for partition caches. name string // idb is indexdb used for the given partition idb *indexdb // ddb is the datadb used for the given partition ddb *datadb } // mustCreatePartition creates a partition at the given path. // // The created partition can be opened with mustOpenPartition() after is has been created. // // The created partition can be deleted with mustDeletePartition() when it is no longer needed. func mustCreatePartition(path string) { fs.MustMkdirFailIfExist(path) indexdbPath := filepath.Join(path, indexdbDirname) mustCreateIndexdb(indexdbPath) datadbPath := filepath.Join(path, datadbDirname) mustCreateDatadb(datadbPath) } // mustDeletePartition deletes partition at the given path. // // The partition must be closed with MustClose before deleting it. func mustDeletePartition(path string) { fs.MustRemoveAll(path) } // mustOpenPartition opens partition at the given path for the given Storage. // // The returned partition must be closed when no longer needed with mustClosePartition() call. func mustOpenPartition(s *Storage, path string) *partition { name := filepath.Base(path) // Open indexdb indexdbPath := filepath.Join(path, indexdbDirname) idb := mustOpenIndexdb(indexdbPath, name, s) // Start initializing the partition pt := &partition{ s: s, path: path, name: name, idb: idb, } // Open datadb datadbPath := filepath.Join(path, datadbDirname) pt.ddb = mustOpenDatadb(pt, datadbPath, s.flushInterval) return pt } // mustClosePartition closes pt. // // The caller must ensure that pt is no longer used before the call to mustClosePartition(). // // The partition can be deleted if needed after it is closed via mustDeletePartition() call. func mustClosePartition(pt *partition) { // Close indexdb mustCloseIndexdb(pt.idb) pt.idb = nil // Close datadb mustCloseDatadb(pt.ddb) pt.ddb = nil pt.name = "" pt.path = "" pt.s = nil } func (pt *partition) mustAddRows(lr *LogRows) { // Register rows in indexdb var pendingRows []int streamIDs := lr.streamIDs for i := range lr.timestamps { streamID := &streamIDs[i] if pt.hasStreamIDInCache(streamID) { continue } if len(pendingRows) == 0 || !streamIDs[pendingRows[len(pendingRows)-1]].equal(streamID) { pendingRows = append(pendingRows, i) } } if len(pendingRows) > 0 { logNewStreams := pt.s.logNewStreams streamTagsCanonicals := lr.streamTagsCanonicals sort.Slice(pendingRows, func(i, j int) bool { return streamIDs[pendingRows[i]].less(&streamIDs[pendingRows[j]]) }) for i, rowIdx := range pendingRows { streamID := &streamIDs[rowIdx] if i > 0 && streamIDs[pendingRows[i-1]].equal(streamID) { continue } if pt.hasStreamIDInCache(streamID) { continue } if !pt.idb.hasStreamID(streamID) { streamTagsCanonical := streamTagsCanonicals[rowIdx] pt.idb.mustRegisterStream(streamID, streamTagsCanonical) if logNewStreams { pt.logNewStream(streamTagsCanonical, lr.rows[rowIdx]) } } pt.putStreamIDToCache(streamID) } } // Add rows to datadb pt.ddb.mustAddRows(lr) if pt.s.logIngestedRows { pt.logIngestedRows(lr) } } func (pt *partition) logNewStream(streamTagsCanonical []byte, fields []Field) { streamTags := getStreamTagsString(streamTagsCanonical) rf := RowFormatter(fields) logger.Infof("partition %s: new stream %s for log entry %s", pt.path, streamTags, &rf) } func (pt *partition) logIngestedRows(lr *LogRows) { var rf RowFormatter for i, fields := range lr.rows { tf := TimeFormatter(lr.timestamps[i]) streamTags := getStreamTagsString(lr.streamTagsCanonicals[i]) rf = append(rf[:0], fields...) rf = append(rf, Field{ Name: "_time", Value: tf.String(), }) rf = append(rf, Field{ Name: "_stream", Value: streamTags, }) sort.Slice(rf, func(i, j int) bool { return rf[i].Name < rf[j].Name }) logger.Infof("partition %s: new log entry %s", pt.path, &rf) } } // appendStreamTagsByStreamID appends canonical representation of stream tags for the given sid to dst // and returns the result. func (pt *partition) appendStreamTagsByStreamID(dst []byte, sid *streamID) []byte { // Search for the StreamTags in the cache. key := bbPool.Get() defer bbPool.Put(key) // There is no need in putting partition name into key here, // since StreamTags is uniquely identified by streamID. key.B = sid.marshal(key.B) dstLen := len(dst) dst = pt.s.streamTagsCache.GetBig(dst, key.B) if len(dst) > dstLen { // Fast path - the StreamTags have been found in cache. return dst } // Slow path - search for StreamTags in idb dst = pt.idb.appendStreamTagsByStreamID(dst, sid) if len(dst) > dstLen { // Store the found StreamTags to cache pt.s.streamTagsCache.SetBig(key.B, dst[dstLen:]) } return dst } func (pt *partition) hasStreamIDInCache(sid *streamID) bool { var result [1]byte bb := bbPool.Get() bb.B = pt.marshalStreamIDCacheKey(bb.B, sid) value := pt.s.streamIDCache.Get(result[:0], bb.B) bbPool.Put(bb) return bytes.Equal(value, okValue) } func (pt *partition) putStreamIDToCache(sid *streamID) { bb := bbPool.Get() bb.B = pt.marshalStreamIDCacheKey(bb.B, sid) pt.s.streamIDCache.Set(bb.B, okValue) bbPool.Put(bb) } func (pt *partition) marshalStreamIDCacheKey(dst []byte, sid *streamID) []byte { dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(pt.name)) dst = sid.marshal(dst) return dst } var okValue = []byte("1") // debugFlush makes sure that all the recently ingested data data becomes searchable func (pt *partition) debugFlush() { pt.ddb.debugFlush() pt.idb.debugFlush() } func (pt *partition) updateStats(ps *PartitionStats) { pt.ddb.updateStats(&ps.DatadbStats) pt.idb.updateStats(&ps.IndexdbStats) }