mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
304 lines
6.8 KiB
Go
304 lines
6.8 KiB
Go
package logstorage
|
|
|
|
import (
|
|
"sort"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
)
|
|
|
|
// LogRows holds a set of rows needed for Storage.MustAddRows
|
|
//
|
|
// LogRows must be obtained via GetLogRows()
|
|
type LogRows struct {
|
|
// buf holds all the bytes referred by items in LogRows
|
|
buf []byte
|
|
|
|
// fieldsBuf holds all the fields referred by items in LogRows
|
|
fieldsBuf []Field
|
|
|
|
// streamIDs holds streamIDs for rows added to LogRows
|
|
streamIDs []streamID
|
|
|
|
// streamTagsCanonicals holds streamTagsCanonical entries for rows added to LogRows
|
|
streamTagsCanonicals [][]byte
|
|
|
|
// timestamps holds stimestamps for rows added to LogRows
|
|
timestamps []int64
|
|
|
|
// rows holds fields for rows atted to LogRows.
|
|
rows [][]Field
|
|
|
|
// sf is a helper for sorting fields in every added row
|
|
sf sortedFields
|
|
|
|
// streamFields contains names for stream fields
|
|
streamFields map[string]struct{}
|
|
|
|
// ignoreFields contains names for log fields, which must be skipped during data ingestion
|
|
ignoreFields map[string]struct{}
|
|
}
|
|
|
|
type sortedFields []Field
|
|
|
|
func (sf *sortedFields) Len() int {
|
|
return len(*sf)
|
|
}
|
|
|
|
func (sf *sortedFields) Less(i, j int) bool {
|
|
a := *sf
|
|
return a[i].Name < a[j].Name
|
|
}
|
|
|
|
func (sf *sortedFields) Swap(i, j int) {
|
|
a := *sf
|
|
a[i], a[j] = a[j], a[i]
|
|
}
|
|
|
|
// RowFormatter implementes fmt.Stringer for []Field aka a single log row
|
|
type RowFormatter []Field
|
|
|
|
// String returns user-readable representation for rf
|
|
func (rf *RowFormatter) String() string {
|
|
b := append([]byte{}, '{')
|
|
|
|
fields := *rf
|
|
if len(fields) > 0 {
|
|
b = append(b, fields[0].String()...)
|
|
fields = fields[1:]
|
|
for _, field := range fields {
|
|
b = append(b, ',')
|
|
b = append(b, field.String()...)
|
|
}
|
|
}
|
|
|
|
b = append(b, '}')
|
|
return string(b)
|
|
}
|
|
|
|
// Reset resets lr with all its settings.
|
|
//
|
|
// Call ResetKeepSettings() for resetting lr without resetting its settings.
|
|
func (lr *LogRows) Reset() {
|
|
lr.ResetKeepSettings()
|
|
|
|
sfs := lr.streamFields
|
|
for k := range sfs {
|
|
delete(sfs, k)
|
|
}
|
|
|
|
ifs := lr.ignoreFields
|
|
for k := range ifs {
|
|
delete(ifs, k)
|
|
}
|
|
}
|
|
|
|
// ResetKeepSettings resets rows stored in lr, while keeping its settings passed to GetLogRows().
|
|
func (lr *LogRows) ResetKeepSettings() {
|
|
lr.buf = lr.buf[:0]
|
|
|
|
fb := lr.fieldsBuf
|
|
for i := range fb {
|
|
fb[i].Reset()
|
|
}
|
|
lr.fieldsBuf = fb[:0]
|
|
|
|
sids := lr.streamIDs
|
|
for i := range sids {
|
|
sids[i].reset()
|
|
}
|
|
lr.streamIDs = sids[:0]
|
|
|
|
sns := lr.streamTagsCanonicals
|
|
for i := range sns {
|
|
sns[i] = nil
|
|
}
|
|
lr.streamTagsCanonicals = sns[:0]
|
|
|
|
lr.timestamps = lr.timestamps[:0]
|
|
|
|
rows := lr.rows
|
|
for i := range rows {
|
|
rows[i] = nil
|
|
}
|
|
lr.rows = rows[:0]
|
|
|
|
lr.sf = nil
|
|
}
|
|
|
|
// NeedFlush returns true if lr contains too much data, so it must be flushed to the storage.
|
|
func (lr *LogRows) NeedFlush() bool {
|
|
return len(lr.buf) > (maxUncompressedBlockSize/8)*7
|
|
}
|
|
|
|
// MustAdd adds a log entry with the given args to lr.
|
|
//
|
|
// It is OK to modify the args after returning from the function,
|
|
// since lr copies all the args to internal data.
|
|
func (lr *LogRows) MustAdd(tenantID TenantID, timestamp int64, fields []Field) {
|
|
// Compose StreamTags from fields according to lr.streamFields
|
|
sfs := lr.streamFields
|
|
st := GetStreamTags()
|
|
for i := range fields {
|
|
f := &fields[i]
|
|
if _, ok := sfs[f.Name]; ok {
|
|
st.Add(f.Name, f.Value)
|
|
}
|
|
}
|
|
|
|
// Marshal StreamTags
|
|
bb := bbPool.Get()
|
|
bb.B = st.MarshalCanonical(bb.B)
|
|
PutStreamTags(st)
|
|
|
|
// Calculate the id for the StreamTags
|
|
var sid streamID
|
|
sid.tenantID = tenantID
|
|
sid.id = hash128(bb.B)
|
|
|
|
// Store the row
|
|
lr.mustAddInternal(sid, timestamp, fields, bb.B)
|
|
bbPool.Put(bb)
|
|
}
|
|
|
|
func (lr *LogRows) mustAddInternal(sid streamID, timestamp int64, fields []Field, streamTagsCanonical []byte) {
|
|
buf := lr.buf
|
|
bufLen := len(buf)
|
|
buf = append(buf, streamTagsCanonical...)
|
|
|
|
lr.streamTagsCanonicals = append(lr.streamTagsCanonicals, buf[bufLen:])
|
|
lr.streamIDs = append(lr.streamIDs, sid)
|
|
lr.timestamps = append(lr.timestamps, timestamp)
|
|
|
|
// Store all the fields
|
|
ifs := lr.ignoreFields
|
|
fb := lr.fieldsBuf
|
|
fieldsLen := len(fb)
|
|
for i := range fields {
|
|
f := &fields[i]
|
|
|
|
if _, ok := ifs[f.Name]; ok {
|
|
// Skip fields from the ifs map
|
|
continue
|
|
}
|
|
if f.Value == "" {
|
|
// Skip fields without values
|
|
continue
|
|
}
|
|
|
|
fb = append(fb, Field{})
|
|
dstField := &fb[len(fb)-1]
|
|
|
|
bufLen = len(buf)
|
|
if f.Name != "_msg" {
|
|
buf = append(buf, f.Name...)
|
|
}
|
|
dstField.Name = bytesutil.ToUnsafeString(buf[bufLen:])
|
|
|
|
bufLen = len(buf)
|
|
buf = append(buf, f.Value...)
|
|
dstField.Value = bytesutil.ToUnsafeString(buf[bufLen:])
|
|
}
|
|
lr.sf = fb[fieldsLen:]
|
|
sort.Sort(&lr.sf)
|
|
lr.rows = append(lr.rows, lr.sf)
|
|
|
|
lr.fieldsBuf = fb
|
|
lr.buf = buf
|
|
}
|
|
|
|
// GetRowString returns string representation of the row with the given idx.
|
|
func (lr *LogRows) GetRowString(idx int) string {
|
|
tf := TimeFormatter(lr.timestamps[idx])
|
|
streamTags := getStreamTagsString(lr.streamTagsCanonicals[idx])
|
|
var rf RowFormatter
|
|
rf = append(rf[:0], lr.rows[idx]...)
|
|
rf = append(rf, Field{
|
|
Name: "_time",
|
|
Value: tf.String(),
|
|
})
|
|
rf = append(rf, Field{
|
|
Name: "_stream",
|
|
Value: streamTags,
|
|
})
|
|
sort.Slice(rf, func(i, j int) bool {
|
|
return rf[i].Name < rf[j].Name
|
|
})
|
|
return rf.String()
|
|
}
|
|
|
|
// GetLogRows returns LogRows from the pool for the given streamFields.
|
|
//
|
|
// streamFields is a set of field names, which must be associated with the stream.
|
|
//
|
|
// Return back it to the pool with PutLogRows() when it is no longer needed.
|
|
func GetLogRows(streamFields, ignoreFields []string) *LogRows {
|
|
v := logRowsPool.Get()
|
|
if v == nil {
|
|
v = &LogRows{}
|
|
}
|
|
lr := v.(*LogRows)
|
|
|
|
// Initialize streamFields
|
|
sfs := lr.streamFields
|
|
if sfs == nil {
|
|
sfs = make(map[string]struct{}, len(streamFields))
|
|
lr.streamFields = sfs
|
|
}
|
|
for _, f := range streamFields {
|
|
sfs[f] = struct{}{}
|
|
}
|
|
|
|
// Initialize ignoreFields
|
|
ifs := lr.ignoreFields
|
|
if ifs == nil {
|
|
ifs = make(map[string]struct{}, len(ignoreFields))
|
|
lr.ignoreFields = ifs
|
|
}
|
|
for _, f := range ignoreFields {
|
|
if f != "" {
|
|
ifs[f] = struct{}{}
|
|
}
|
|
}
|
|
|
|
return lr
|
|
}
|
|
|
|
// PutLogRows returns lr to the pool.
|
|
func PutLogRows(lr *LogRows) {
|
|
lr.Reset()
|
|
logRowsPool.Put(lr)
|
|
}
|
|
|
|
var logRowsPool sync.Pool
|
|
|
|
// Len returns the number of items in lr.
|
|
func (lr *LogRows) Len() int {
|
|
return len(lr.streamIDs)
|
|
}
|
|
|
|
// Less returns true if (streamID, timestamp) for row i is smaller than the (streamID, timestamp) for row j
|
|
func (lr *LogRows) Less(i, j int) bool {
|
|
a := &lr.streamIDs[i]
|
|
b := &lr.streamIDs[j]
|
|
if !a.equal(b) {
|
|
return a.less(b)
|
|
}
|
|
return lr.timestamps[i] < lr.timestamps[j]
|
|
}
|
|
|
|
// Swap swaps rows i and j in lr.
|
|
func (lr *LogRows) Swap(i, j int) {
|
|
a := &lr.streamIDs[i]
|
|
b := &lr.streamIDs[j]
|
|
*a, *b = *b, *a
|
|
|
|
tsA, tsB := &lr.timestamps[i], &lr.timestamps[j]
|
|
*tsA, *tsB = *tsB, *tsA
|
|
|
|
snA, snB := &lr.streamTagsCanonicals[i], &lr.streamTagsCanonicals[j]
|
|
*snA, *snB = *snB, *snA
|
|
|
|
fieldsA, fieldsB := &lr.rows[i], &lr.rows[j]
|
|
*fieldsA, *fieldsB = *fieldsB, *fieldsA
|
|
}
|