VictoriaMetrics/lib/logstorage/partition.go
Aliaksandr Valialkin 7b33a27874
lib/logstorage: follow-up for 8a23d08c21
- Compare the actual free disk space to the value provided via -storage.minFreeDiskSpaceBytes
  directly inside the Storage.IsReadOnly(). This should work fast in most cases.
  This simplifies the logic at lib/storage.

- Do not take into account -storage.minFreeDiskSpaceBytes during background merges, since
  it results in uncontrolled growth of small parts when the free disk space approaches -storage.minFreeDiskSpaceBytes.
  The background merge logic uses another mechanism for determining whether there is enough
  disk space for the merge - it reserves the needed disk space before the merge
  and releases it after the merge. This prevents from out of disk space errors during background merge.

- Properly handle corner cases for flushing in-memory data to disk when the storage
  enters read-only mode. This is better than losing the in-memory data.

- Return back Storage.MustAddRows() instead of Storage.AddRows(),
  since the only case when AddRows() can return error is when the storage is in read-only mode.
  This case must be handled by the caller by calling Storage.IsReadOnly()
  before adding rows to the storage.
  This simplifies the code a bit, since the caller of Storage.MustAddRows() shouldn't handle
  errors returned by Storage.AddRows().

- Properly store parsed logs to Storage if parts of the request contain invalid log lines.
  Previously the parsed logs could be lost in this case.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4737
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4945
2023-10-02 16:52:23 +02:00

223 lines
5.9 KiB
Go

package logstorage
import (
"bytes"
"path/filepath"
"sort"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// PartitionStats contains stats for the partition.
type PartitionStats struct {
DatadbStats
IndexdbStats
}
type partition struct {
// s is the parent storage for the partition
s *Storage
// path is the path to the partition directory
path string
// name is the partition name. It is basically the directory name obtained from path.
// It is used for creating keys for partition caches.
name string
// idb is indexdb used for the given partition
idb *indexdb
// ddb is the datadb used for the given partition
ddb *datadb
}
// mustCreatePartition creates a partition at the given path.
//
// The created partition can be opened with mustOpenPartition() after is has been created.
//
// The created partition can be deleted with mustDeletePartition() when it is no longer needed.
func mustCreatePartition(path string) {
fs.MustMkdirFailIfExist(path)
indexdbPath := filepath.Join(path, indexdbDirname)
mustCreateIndexdb(indexdbPath)
datadbPath := filepath.Join(path, datadbDirname)
mustCreateDatadb(datadbPath)
}
// mustDeletePartition deletes partition at the given path.
//
// The partition must be closed with MustClose before deleting it.
func mustDeletePartition(path string) {
fs.MustRemoveAll(path)
}
// mustOpenPartition opens partition at the given path for the given Storage.
//
// The returned partition must be closed when no longer needed with mustClosePartition() call.
func mustOpenPartition(s *Storage, path string) *partition {
name := filepath.Base(path)
// Open indexdb
indexdbPath := filepath.Join(path, indexdbDirname)
idb := mustOpenIndexdb(indexdbPath, name, s)
// Start initializing the partition
pt := &partition{
s: s,
path: path,
name: name,
idb: idb,
}
// Open datadb
datadbPath := filepath.Join(path, datadbDirname)
pt.ddb = mustOpenDatadb(pt, datadbPath, s.flushInterval)
return pt
}
// mustClosePartition closes pt.
//
// The caller must ensure that pt is no longer used before the call to mustClosePartition().
//
// The partition can be deleted if needed after it is closed via mustDeletePartition() call.
func mustClosePartition(pt *partition) {
// Close indexdb
mustCloseIndexdb(pt.idb)
pt.idb = nil
// Close datadb
mustCloseDatadb(pt.ddb)
pt.ddb = nil
pt.name = ""
pt.path = ""
pt.s = nil
}
func (pt *partition) mustAddRows(lr *LogRows) {
// Register rows in indexdb
var pendingRows []int
streamIDs := lr.streamIDs
for i := range lr.timestamps {
streamID := &streamIDs[i]
if pt.hasStreamIDInCache(streamID) {
continue
}
if len(pendingRows) == 0 || !streamIDs[pendingRows[len(pendingRows)-1]].equal(streamID) {
pendingRows = append(pendingRows, i)
}
}
if len(pendingRows) > 0 {
logNewStreams := pt.s.logNewStreams
streamTagsCanonicals := lr.streamTagsCanonicals
sort.Slice(pendingRows, func(i, j int) bool {
return streamIDs[pendingRows[i]].less(&streamIDs[pendingRows[j]])
})
for i, rowIdx := range pendingRows {
streamID := &streamIDs[rowIdx]
if i > 0 && streamIDs[pendingRows[i-1]].equal(streamID) {
continue
}
if pt.hasStreamIDInCache(streamID) {
continue
}
if !pt.idb.hasStreamID(streamID) {
streamTagsCanonical := streamTagsCanonicals[rowIdx]
pt.idb.mustRegisterStream(streamID, streamTagsCanonical)
if logNewStreams {
pt.logNewStream(streamTagsCanonical, lr.rows[rowIdx])
}
}
pt.putStreamIDToCache(streamID)
}
}
// Add rows to datadb
pt.ddb.mustAddRows(lr)
if pt.s.logIngestedRows {
pt.logIngestedRows(lr)
}
}
func (pt *partition) logNewStream(streamTagsCanonical []byte, fields []Field) {
streamTags := getStreamTagsString(streamTagsCanonical)
rf := RowFormatter(fields)
logger.Infof("partition %s: new stream %s for log entry %s", pt.path, streamTags, &rf)
}
func (pt *partition) logIngestedRows(lr *LogRows) {
for i := range lr.rows {
s := lr.GetRowString(i)
logger.Infof("partition %s: new log entry %s", pt.path, s)
}
}
// appendStreamTagsByStreamID appends canonical representation of stream tags for the given sid to dst
// and returns the result.
func (pt *partition) appendStreamTagsByStreamID(dst []byte, sid *streamID) []byte {
// Search for the StreamTags in the cache.
key := bbPool.Get()
defer bbPool.Put(key)
// There is no need in putting partition name into key here,
// since StreamTags is uniquely identified by streamID.
key.B = sid.marshal(key.B)
dstLen := len(dst)
dst = pt.s.streamTagsCache.GetBig(dst, key.B)
if len(dst) > dstLen {
// Fast path - the StreamTags have been found in cache.
return dst
}
// Slow path - search for StreamTags in idb
dst = pt.idb.appendStreamTagsByStreamID(dst, sid)
if len(dst) > dstLen {
// Store the found StreamTags to cache
pt.s.streamTagsCache.SetBig(key.B, dst[dstLen:])
}
return dst
}
func (pt *partition) hasStreamIDInCache(sid *streamID) bool {
var result [1]byte
bb := bbPool.Get()
bb.B = pt.marshalStreamIDCacheKey(bb.B, sid)
value := pt.s.streamIDCache.Get(result[:0], bb.B)
bbPool.Put(bb)
return bytes.Equal(value, okValue)
}
func (pt *partition) putStreamIDToCache(sid *streamID) {
bb := bbPool.Get()
bb.B = pt.marshalStreamIDCacheKey(bb.B, sid)
pt.s.streamIDCache.Set(bb.B, okValue)
bbPool.Put(bb)
}
func (pt *partition) marshalStreamIDCacheKey(dst []byte, sid *streamID) []byte {
dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(pt.name))
dst = sid.marshal(dst)
return dst
}
var okValue = []byte("1")
// debugFlush makes sure that all the recently ingested data data becomes searchable
func (pt *partition) debugFlush() {
pt.ddb.debugFlush()
pt.idb.debugFlush()
}
func (pt *partition) updateStats(ps *PartitionStats) {
pt.ddb.updateStats(&ps.DatadbStats)
pt.idb.updateStats(&ps.IndexdbStats)
}