VictoriaMetrics/lib/logstorage/log_rows.go

306 lines
6.8 KiB
Go
Raw Normal View History

package logstorage
import (
"sort"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
// LogRows holds a set of rows needed for Storage.MustAddRows
//
// LogRows must be obtained via GetLogRows()
type LogRows struct {
// buf holds all the bytes referred by items in LogRows
buf []byte
// fieldsBuf holds all the fields referred by items in LogRows
fieldsBuf []Field
// streamIDs holds streamIDs for rows added to LogRows
streamIDs []streamID
// streamTagsCanonicals holds streamTagsCanonical entries for rows added to LogRows
streamTagsCanonicals [][]byte
// timestamps holds stimestamps for rows added to LogRows
timestamps []int64
// rows holds fields for rows atted to LogRows.
rows [][]Field
// sf is a helper for sorting fields in every added row
sf sortedFields
// streamFields contains names for stream fields
streamFields map[string]struct{}
// ignoreFields contains names for log fields, which must be skipped during data ingestion
ignoreFields map[string]struct{}
}
type sortedFields []Field
func (sf *sortedFields) Len() int {
return len(*sf)
}
func (sf *sortedFields) Less(i, j int) bool {
a := *sf
return a[i].Name < a[j].Name
}
func (sf *sortedFields) Swap(i, j int) {
a := *sf
a[i], a[j] = a[j], a[i]
}
// RowFormatter implementes fmt.Stringer for []Field aka a single log row
type RowFormatter []Field
// String returns user-readable representation for rf
func (rf *RowFormatter) String() string {
b := append([]byte{}, '{')
fields := *rf
if len(fields) > 0 {
b = append(b, fields[0].String()...)
fields = fields[1:]
for _, field := range fields {
b = append(b, ',')
b = append(b, field.String()...)
}
}
b = append(b, '}')
return string(b)
}
// Reset resets lr with all its settings.
//
// Call ResetKeepSettings() for resetting lr without resetting its settings.
func (lr *LogRows) Reset() {
lr.ResetKeepSettings()
sfs := lr.streamFields
for k := range sfs {
delete(sfs, k)
}
ifs := lr.ignoreFields
for k := range ifs {
delete(ifs, k)
}
}
// ResetKeepSettings resets rows stored in lr, while keeping its settings passed to GetLogRows().
func (lr *LogRows) ResetKeepSettings() {
lr.buf = lr.buf[:0]
fb := lr.fieldsBuf
for i := range fb {
fb[i].Reset()
}
lr.fieldsBuf = fb[:0]
sids := lr.streamIDs
for i := range sids {
sids[i].reset()
}
lr.streamIDs = sids[:0]
sns := lr.streamTagsCanonicals
for i := range sns {
sns[i] = nil
}
lr.streamTagsCanonicals = sns[:0]
lr.timestamps = lr.timestamps[:0]
rows := lr.rows
for i := range rows {
rows[i] = nil
}
lr.rows = rows[:0]
lr.sf = nil
}
// NeedFlush returns true if lr contains too much data, so it must be flushed to the storage.
func (lr *LogRows) NeedFlush() bool {
return len(lr.buf) > (maxUncompressedBlockSize/8)*7
}
// MustAdd adds a log entry with the given args to lr.
//
// It is OK to modify the args after returning from the function,
// since lr copies all the args to internal data.
func (lr *LogRows) MustAdd(tenantID TenantID, timestamp int64, fields []Field) {
// Compose StreamTags from fields according to lr.streamFields
sfs := lr.streamFields
st := GetStreamTags()
for i := range fields {
f := &fields[i]
if _, ok := sfs[f.Name]; ok {
st.Add(f.Name, f.Value)
}
}
// Marshal StreamTags
bb := bbPool.Get()
bb.B = st.MarshalCanonical(bb.B)
PutStreamTags(st)
// Calculate the id for the StreamTags
var sid streamID
sid.tenantID = tenantID
sid.id = hash128(bb.B)
// Store the row
lr.mustAddInternal(sid, timestamp, fields, bb.B)
bbPool.Put(bb)
}
func (lr *LogRows) mustAddInternal(sid streamID, timestamp int64, fields []Field, streamTagsCanonical []byte) {
buf := lr.buf
bufLen := len(buf)
buf = append(buf, streamTagsCanonical...)
lr.streamTagsCanonicals = append(lr.streamTagsCanonicals, buf[bufLen:])
lr.streamIDs = append(lr.streamIDs, sid)
lr.timestamps = append(lr.timestamps, timestamp)
// Store all the fields
ifs := lr.ignoreFields
fb := lr.fieldsBuf
fieldsLen := len(fb)
for i := range fields {
f := &fields[i]
if _, ok := ifs[f.Name]; ok {
// Skip fields from the ifs map
continue
}
if f.Value == "" {
// Skip fields without values
continue
}
fb = append(fb, Field{})
dstField := &fb[len(fb)-1]
bufLen = len(buf)
if f.Name != "_msg" {
buf = append(buf, f.Name...)
}
dstField.Name = bytesutil.ToUnsafeString(buf[bufLen:])
bufLen = len(buf)
buf = append(buf, f.Value...)
dstField.Value = bytesutil.ToUnsafeString(buf[bufLen:])
}
lr.sf = fb[fieldsLen:]
sort.Sort(&lr.sf)
lr.rows = append(lr.rows, lr.sf)
lr.fieldsBuf = fb
lr.buf = buf
}
// GetRowString returns string representation of the row with the given idx.
func (lr *LogRows) GetRowString(idx int) string {
tf := TimeFormatter(lr.timestamps[idx])
streamTags := getStreamTagsString(lr.streamTagsCanonicals[idx])
var rf RowFormatter
rf = append(rf[:0], lr.rows[idx]...)
rf = append(rf, Field{
Name: "_time",
Value: tf.String(),
})
rf = append(rf, Field{
Name: "_stream",
Value: streamTags,
})
sort.Slice(rf, func(i, j int) bool {
return rf[i].Name < rf[j].Name
})
return rf.String()
}
// GetLogRows returns LogRows from the pool for the given streamFields.
//
// streamFields is a set of field names, which must be associated with the stream.
app/vlinsert/loki: follow-up after 09df5b66fd7bf33f171d6179748e6f32394ef56e - Parse protobuf if Content-Type isn't set to `application/json` - this behavior is documented at https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki - Properly handle gzip'ped JSON requests. The `gzip` header must be read from `Content-Encoding` instead of `Content-Type` header - Properly flush all the parsed logs with the explicit call to vlstorage.MustAddRows() at the end of query handler - Check JSON field types more strictly. - Allow parsing Loki timestamp as floating-point number. Such a timestamp can be generated by some clients, which store timestamps in float64 instead of int64. - Optimize parsing of Loki labels in Prometheus text exposition format. - Simplify tests. - Remove lib/slicesutil, since there are no more users for it. - Update docs with missing info and fix various typos. For example, it should be enough to have `instance` and `job` labels as stream fields in most Loki setups. - Allow empty of missing timestamps in the ingested logs. The current timestamp at VictoriaLogs side is then used for the ingested logs. This simplifies debugging and testing of the provided HTTP-based data ingestion APIs. The remaining MAJOR issue, which needs to be addressed: victoria-logs binary size increased from 13MB to 22MB after adding support for Loki data ingestion protocol at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4482 . This is because of shitty protobuf dependencies. They must be replaced with another protobuf implementation similar to the one used at lib/prompb or lib/prompbmarshal .
2023-07-20 23:21:47 +00:00
// ignoreFields is a set of field names, which must be ignored during data ingestion.
//
// Return back it to the pool with PutLogRows() when it is no longer needed.
func GetLogRows(streamFields, ignoreFields []string) *LogRows {
v := logRowsPool.Get()
if v == nil {
v = &LogRows{}
}
lr := v.(*LogRows)
// Initialize streamFields
sfs := lr.streamFields
if sfs == nil {
sfs = make(map[string]struct{}, len(streamFields))
lr.streamFields = sfs
}
for _, f := range streamFields {
sfs[f] = struct{}{}
}
// Initialize ignoreFields
ifs := lr.ignoreFields
if ifs == nil {
ifs = make(map[string]struct{}, len(ignoreFields))
lr.ignoreFields = ifs
}
for _, f := range ignoreFields {
if f != "" {
ifs[f] = struct{}{}
}
}
return lr
}
// PutLogRows returns lr to the pool.
func PutLogRows(lr *LogRows) {
lr.Reset()
logRowsPool.Put(lr)
}
var logRowsPool sync.Pool
// Len returns the number of items in lr.
func (lr *LogRows) Len() int {
return len(lr.streamIDs)
}
// Less returns true if (streamID, timestamp) for row i is smaller than the (streamID, timestamp) for row j
func (lr *LogRows) Less(i, j int) bool {
a := &lr.streamIDs[i]
b := &lr.streamIDs[j]
if !a.equal(b) {
return a.less(b)
}
return lr.timestamps[i] < lr.timestamps[j]
}
// Swap swaps rows i and j in lr.
func (lr *LogRows) Swap(i, j int) {
a := &lr.streamIDs[i]
b := &lr.streamIDs[j]
*a, *b = *b, *a
tsA, tsB := &lr.timestamps[i], &lr.timestamps[j]
*tsA, *tsB = *tsB, *tsA
snA, snB := &lr.streamTagsCanonicals[i], &lr.streamTagsCanonicals[j]
*snA, *snB = *snB, *snA
fieldsA, fieldsB := &lr.rows[i], &lr.rows[j]
*fieldsA, *fieldsB = *fieldsB, *fieldsA
}